Class: Syskit::DataFlow

Inherits:
Object show all
Defined in:
lib/syskit/data_flow.rb

Overview

Class out of which the Flows::DataFlow graph object is made

see ConnectionGraph for a description of the roles of each connection graph

Defined Under Namespace

Modules: Extension Classes: ConcreteConnectionGraph, ConnectionInPath

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args, **options) ⇒ DataFlow

Returns a new instance of DataFlow



36
37
38
39
40
# File 'lib/syskit/data_flow.rb', line 36

def initialize(*args, **options)
    super
    @modified_tasks = Set.new
    @concrete_connection_graph = nil
end

Instance Attribute Details

#concrete_connection_graphConnectionGraph? (readonly)

If non-nil, this holds the set of concrete connections for this data flow graph. It MUST be maintained by some external entity, and as such is set only in contexts where the set of modifications to the graph is known (e.g. NetworkGeneration::MergeSolver

Returns:



34
35
36
# File 'lib/syskit/data_flow.rb', line 34

def concrete_connection_graph
  @concrete_connection_graph
end

#modified_tasksObject (readonly)

Returns the set of tasks whose data flow has been changed that has not yet been applied.

It is maintained only on executable plans, through the added/removed/updated hooks TaskContext#added_sink, TaskContext#removed_sink, TaskContext#updated_sink, Composition#added_sink, Composition#removing_sink and Composition#updated_sink



26
27
28
# File 'lib/syskit/data_flow.rb', line 26

def modified_tasks
  @modified_tasks
end

#pending_changesObject

The set of connection changes that have been applied to the DataFlow relation graph, but not yet applied on the actual components (i.e. not yet present in the ActualDataFlow graph).



16
17
18
# File 'lib/syskit/data_flow.rb', line 16

def pending_changes
  @pending_changes
end

Instance Method Details

#add_connections(source_task, sink_task, mappings) ⇒ Object

Create new connections between source_task and sink_task.

mappings is a map from port name pairs to the connection policy that should be used:

[output_port_name, input_port_name] => policy

Raises Roby::ModelViolation if the connection already exists with an incompatible policy



111
112
113
114
115
116
117
# File 'lib/syskit/data_flow.rb', line 111

def add_connections(source_task, sink_task, mappings) # :nodoc:
    mappings.each do |(out_port, in_port), options|
        source_task.ensure_has_output_port(out_port)
        sink_task.ensure_has_input_port(in_port)
    end
    super
end

#compute_concrete_connection_graphObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Computes the concrete connection graph from the DataFlow information



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/syskit/data_flow.rb', line 66

def compute_concrete_connection_graph
    current_graph, @concrete_connection_graph = @concrete_connection_graph, nil
    graph = ConcreteConnectionGraph.new
    each_vertex do |task|
        next if !task.kind_of?(Syskit::TaskContext)

        task_to_task = Hash.new
        each_concrete_in_connection(task) do |source_task, source_port, sink_port, policy|
            port_to_port = (task_to_task[source_task] ||= Hash.new)
            port_to_port[[source_port, sink_port]] = policy
        end

        task_to_task.each do |source_task, mappings|
            graph.add_edge(source_task, task, mappings)
        end
    end
    graph
ensure
    @concrete_connection_graph = current_graph
end

#concrete_connection_graph_enabled?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/syskit/data_flow.rb', line 91

def concrete_connection_graph_enabled?
    !!@concrete_connection_graph
end

#disable_concrete_connection_graphObject



87
88
89
# File 'lib/syskit/data_flow.rb', line 87

def disable_concrete_connection_graph
    @concrete_connection_graph = nil
end

#each_concrete_in_connection(task, port = nil) {|source_task, source_port, sink_port, policy| ... } ⇒ Object

Yield or enumerates the connections that exist towards the input ports of self. It does not include connections to composition ports (i.e. exported ports): these connections are followed until a concrete port (a port on an actual Syskit::TaskContext) is found.

Parameters:

  • port (#name, String, nil) (defaults to: nil)

    if non-nil, the port for which we want to enumerate the connections (in which case the sink_port yield parameter is guaranteed to be this name). Otherwise, all ports are enumerated.

Yields:

  • each connections

Yield Parameters:

  • source_task (Syskit::TaskContext)

    the source task in the connection

  • source_port (String)

    the source port name on source_task

  • sink_port (String)

    the sink port name on self. If the port argument is non-nil, it is guaranteed to be the same.

  • policy (Hash)

    the connection policy

See Also:

  • each_concrete_output_connection each_output_connection


192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/syskit/data_flow.rb', line 192

def each_concrete_in_connection(task, port = nil)
    return enum_for(__method__, task, port) if !block_given?

    if concrete_connection_graph
        return concrete_connection_graph.each_in_connection(task, port, &proc)
    else
        each_concrete_in_path(task, port) do |path, aggregated_policy|
            first_conn = path.first
            last_conn  = path.last
            yield(first_conn.source_task, first_conn.source_port, last_conn.sink_port, aggregated_policy)
        end
    end

    self
end

#each_concrete_in_path(task, port = nil) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/syskit/data_flow.rb', line 121

def each_concrete_in_path(task, port = nil)
    return enum_for(__method__, task, port) if !block_given?

    each_in_connection(task, port) do |source_task, source_port, sink_port, policy|
        connection = ConnectionInPath.new(source_task, source_port, task, sink_port, policy)

        # Follow the forwardings while +sink_task+ is a composition
        if source_task.kind_of?(Composition)
            each_concrete_in_path(source_task, source_port) do |source_path, aggregated_policy|
                begin
                    aggregated_policy = Syskit.update_connection_policy(policy, aggregated_policy)
                rescue ArgumentError => e
                    raise SpecError, "incompatible policies in input chain for #{self}:#{sink_port}: #{e.message}"
                end
                aggregated_policy.freeze

                yield(source_path + [connection], aggregated_policy)
            end
        else
            yield([connection], policy)
        end
    end
    self
end

#each_concrete_out_connection(task, port = nil) {|source_port, sink_port, sink_task, policy| ... } ⇒ Object

Yield or enumerates the connections that exist from the output ports of self. It does not include connections to composition ports (i.e. exported ports): these connections are followed until a concrete port (a port on an actual Syskit::TaskContext) is found.

Parameters:

  • port (#name, String, nil) (defaults to: nil)

    if non-nil, the port for which we want to enumerate the connections (in which case the source_port yield parameter is guaranteed to be this name). Otherwise, all ports are enumerated.

Yields:

  • each connections

Yield Parameters:

  • source_port (String)

    the source port name on self. If the port argument is non-nil, it is guaranteed to be the same.

  • sink_port (String)

    the sink port name on sink_task.

  • sink_task (Syskit::TaskContext)

    the sink task in the connection

  • policy (Hash)

    the connection policy

See Also:

  • each_input_connection each_output_connection


230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/syskit/data_flow.rb', line 230

def each_concrete_out_connection(task, port = nil)
    return enum_for(__method__, task, port) if !block_given?

    if concrete_connection_graph
        return concrete_connection_graph.each_out_connection(task, port, &proc)
    else
        each_concrete_out_path(task, port) do |path, aggregated_policy|
            first_conn = path.first
            last_conn  = path.last
            yield(first_conn.source_port, last_conn.sink_port, last_conn.sink_task, aggregated_policy)
        end
    end
    self
end

#each_concrete_out_path(task, port = nil) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/syskit/data_flow.rb', line 146

def each_concrete_out_path(task, port = nil)
    return enum_for(__method__, task, port) if !block_given?

    each_out_connection(task, port) do |source_port, sink_port, sink_task, policy|
        connection = ConnectionInPath.new(task, source_port, sink_task, sink_port, policy)

        if sink_task.kind_of?(Composition)
            each_concrete_out_path(sink_task, sink_port) do |sink_path, aggregated_policy|
                begin
                    aggregated_policy = Syskit.update_connection_policy(policy, aggregated_policy)
                rescue ArgumentError => e
                    raise SpecError, "incompatible policies in input chain for #{self}:#{sink_port}: #{e.message}"
                end
                aggregated_policy.freeze

                yield([connection] + sink_path, aggregated_policy)
            end
        else
            yield([connection], policy)
        end
    end
    self
end

#enable_concrete_connection_graph(compute: true) ⇒ Object



54
55
56
57
58
59
60
61
# File 'lib/syskit/data_flow.rb', line 54

def enable_concrete_connection_graph(compute: true)
    @concrete_connection_graph =
        if compute
            compute_concrete_connection_graph
        else
            ConcreteConnectionGraph.new
        end
end

#merge_info(source, sink, current_mappings, additional_mappings) ⇒ Object

Called by the relation graph management to update the DataFlow edge information when connections are added or removed.



97
98
99
100
101
# File 'lib/syskit/data_flow.rb', line 97

def merge_info(source, sink, current_mappings, additional_mappings)
    current_mappings.merge(additional_mappings) do |_, old_options, new_options|
        Syskit.update_connection_policy(old_options, new_options)
    end
end