Class: Syskit::DataFlow
Overview
Class out of which the Flows::DataFlow graph object is made
see ConnectionGraph for a description of the roles of each connection graph
Defined Under Namespace
Modules: Extension Classes: ConcreteConnectionGraph, ConnectionInPath
Instance Attribute Summary collapse
-
#concrete_connection_graph ⇒ ConnectionGraph?
readonly
If non-nil, this holds the set of concrete connections for this data flow graph.
-
#modified_tasks ⇒ Object
readonly
Returns the set of tasks whose data flow has been changed that has not yet been applied.
-
#pending_changes ⇒ Object
The set of connection changes that have been applied to the DataFlow relation graph, but not yet applied on the actual components (i.e. not yet present in the ActualDataFlow graph).
Instance Method Summary collapse
-
#add_connections(source_task, sink_task, mappings) ⇒ Object
Create new connections between
source_task
andsink_task
. -
#compute_concrete_connection_graph ⇒ Object
private
Computes the concrete connection graph from the DataFlow information.
- #concrete_connection_graph_enabled? ⇒ Boolean
- #disable_concrete_connection_graph ⇒ Object
-
#each_concrete_in_connection(task, port = nil) {|source_task, source_port, sink_port, policy| ... } ⇒ Object
Yield or enumerates the connections that exist towards the input ports of self.
- #each_concrete_in_path(task, port = nil) ⇒ Object
-
#each_concrete_out_connection(task, port = nil) {|source_port, sink_port, sink_task, policy| ... } ⇒ Object
Yield or enumerates the connections that exist from the output ports of self.
- #each_concrete_out_path(task, port = nil) ⇒ Object
- #enable_concrete_connection_graph(compute: true) ⇒ Object
-
#initialize(*args, **options) ⇒ DataFlow
constructor
A new instance of DataFlow.
-
#merge_info(source, sink, current_mappings, additional_mappings) ⇒ Object
Called by the relation graph management to update the DataFlow edge information when connections are added or removed.
Constructor Details
#initialize(*args, **options) ⇒ DataFlow
Returns a new instance of DataFlow
36 37 38 39 40 |
# File 'lib/syskit/data_flow.rb', line 36 def initialize(*args, **) super @modified_tasks = Set.new @concrete_connection_graph = nil end |
Instance Attribute Details
#concrete_connection_graph ⇒ ConnectionGraph? (readonly)
If non-nil, this holds the set of concrete connections for this data flow graph. It MUST be maintained by some external entity, and as such is set only in contexts where the set of modifications to the graph is known (e.g. NetworkGeneration::MergeSolver
34 35 36 |
# File 'lib/syskit/data_flow.rb', line 34 def concrete_connection_graph @concrete_connection_graph end |
#modified_tasks ⇒ Object (readonly)
Returns the set of tasks whose data flow has been changed that has not yet been applied.
It is maintained only on executable plans, through the added/removed/updated hooks TaskContext#added_sink, TaskContext#removed_sink, TaskContext#updated_sink, Composition#added_sink, Composition#removing_sink and Composition#updated_sink
26 27 28 |
# File 'lib/syskit/data_flow.rb', line 26 def modified_tasks @modified_tasks end |
#pending_changes ⇒ Object
The set of connection changes that have been applied to the DataFlow relation graph, but not yet applied on the actual components (i.e. not yet present in the ActualDataFlow graph).
16 17 18 |
# File 'lib/syskit/data_flow.rb', line 16 def pending_changes @pending_changes end |
Instance Method Details
#add_connections(source_task, sink_task, mappings) ⇒ Object
Create new connections between source_task
and
sink_task
.
mappings
is a map from port name pairs to the connection
policy that should be used:
[output_port_name, input_port_name] => policy
Raises Roby::ModelViolation if the connection already exists with an incompatible policy
111 112 113 114 115 116 117 |
# File 'lib/syskit/data_flow.rb', line 111 def add_connections(source_task, sink_task, mappings) # :nodoc: mappings.each do |(out_port, in_port), | source_task.ensure_has_output_port(out_port) sink_task.ensure_has_input_port(in_port) end super end |
#compute_concrete_connection_graph ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Computes the concrete connection graph from the DataFlow information
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/syskit/data_flow.rb', line 66 def compute_concrete_connection_graph current_graph, @concrete_connection_graph = @concrete_connection_graph, nil graph = ConcreteConnectionGraph.new each_vertex do |task| next if !task.kind_of?(Syskit::TaskContext) task_to_task = Hash.new each_concrete_in_connection(task) do |source_task, source_port, sink_port, policy| port_to_port = (task_to_task[source_task] ||= Hash.new) port_to_port[[source_port, sink_port]] = policy end task_to_task.each do |source_task, mappings| graph.add_edge(source_task, task, mappings) end end graph ensure @concrete_connection_graph = current_graph end |
#concrete_connection_graph_enabled? ⇒ Boolean
91 92 93 |
# File 'lib/syskit/data_flow.rb', line 91 def concrete_connection_graph_enabled? !!@concrete_connection_graph end |
#disable_concrete_connection_graph ⇒ Object
87 88 89 |
# File 'lib/syskit/data_flow.rb', line 87 def disable_concrete_connection_graph @concrete_connection_graph = nil end |
#each_concrete_in_connection(task, port = nil) {|source_task, source_port, sink_port, policy| ... } ⇒ Object
Yield or enumerates the connections that exist towards the input ports of self. It does not include connections to composition ports (i.e. exported ports): these connections are followed until a concrete port (a port on an actual Syskit::TaskContext) is found.
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/syskit/data_flow.rb', line 192 def each_concrete_in_connection(task, port = nil) return enum_for(__method__, task, port) if !block_given? if concrete_connection_graph return concrete_connection_graph.each_in_connection(task, port, &proc) else each_concrete_in_path(task, port) do |path, aggregated_policy| first_conn = path.first last_conn = path.last yield(first_conn.source_task, first_conn.source_port, last_conn.sink_port, aggregated_policy) end end self end |
#each_concrete_in_path(task, port = nil) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/syskit/data_flow.rb', line 121 def each_concrete_in_path(task, port = nil) return enum_for(__method__, task, port) if !block_given? each_in_connection(task, port) do |source_task, source_port, sink_port, policy| connection = ConnectionInPath.new(source_task, source_port, task, sink_port, policy) # Follow the forwardings while +sink_task+ is a composition if source_task.kind_of?(Composition) each_concrete_in_path(source_task, source_port) do |source_path, aggregated_policy| begin aggregated_policy = Syskit.update_connection_policy(policy, aggregated_policy) rescue ArgumentError => e raise SpecError, "incompatible policies in input chain for #{self}:#{sink_port}: #{e.}" end aggregated_policy.freeze yield(source_path + [connection], aggregated_policy) end else yield([connection], policy) end end self end |
#each_concrete_out_connection(task, port = nil) {|source_port, sink_port, sink_task, policy| ... } ⇒ Object
Yield or enumerates the connections that exist from the output ports of self. It does not include connections to composition ports (i.e. exported ports): these connections are followed until a concrete port (a port on an actual Syskit::TaskContext) is found.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/syskit/data_flow.rb', line 230 def each_concrete_out_connection(task, port = nil) return enum_for(__method__, task, port) if !block_given? if concrete_connection_graph return concrete_connection_graph.each_out_connection(task, port, &proc) else each_concrete_out_path(task, port) do |path, aggregated_policy| first_conn = path.first last_conn = path.last yield(first_conn.source_port, last_conn.sink_port, last_conn.sink_task, aggregated_policy) end end self end |
#each_concrete_out_path(task, port = nil) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/syskit/data_flow.rb', line 146 def each_concrete_out_path(task, port = nil) return enum_for(__method__, task, port) if !block_given? each_out_connection(task, port) do |source_port, sink_port, sink_task, policy| connection = ConnectionInPath.new(task, source_port, sink_task, sink_port, policy) if sink_task.kind_of?(Composition) each_concrete_out_path(sink_task, sink_port) do |sink_path, aggregated_policy| begin aggregated_policy = Syskit.update_connection_policy(policy, aggregated_policy) rescue ArgumentError => e raise SpecError, "incompatible policies in input chain for #{self}:#{sink_port}: #{e.}" end aggregated_policy.freeze yield([connection] + sink_path, aggregated_policy) end else yield([connection], policy) end end self end |
#enable_concrete_connection_graph(compute: true) ⇒ Object
54 55 56 57 58 59 60 61 |
# File 'lib/syskit/data_flow.rb', line 54 def enable_concrete_connection_graph(compute: true) @concrete_connection_graph = if compute compute_concrete_connection_graph else ConcreteConnectionGraph.new end end |
#merge_info(source, sink, current_mappings, additional_mappings) ⇒ Object
Called by the relation graph management to update the DataFlow edge information when connections are added or removed.
97 98 99 100 101 |
# File 'lib/syskit/data_flow.rb', line 97 def merge_info(source, sink, current_mappings, additional_mappings) current_mappings.merge(additional_mappings) do |_, , | Syskit.update_connection_policy(, ) end end |