Class: Syskit::NetworkGeneration::DataFlowComputation

Inherits:
Object
  • Object
show all
Extended by:
Logger::Hierarchy
Includes:
Logger::Hierarchy
Defined in:
lib/syskit/network_generation/dataflow_computation.rb

Overview

This class embeds the basic handling of computations that need to follow the dataflow

It provides an interface that allows to propagate information along the dataflow

Requirements on the information type:

  • empty?

  • merge(value)

Subclasses need to redefine the following methods (go to the method documentation for more information)

  • initial_information(task)

  • required_information(tasks)

  • triggering_inputs(task)

  • propagate_task(task)

Direct Known Subclasses

DataFlowDynamics

Defined Under Namespace

Classes: Trigger

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeDataFlowComputation

Returns a new instance of DataFlowComputation



35
36
37
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 35

def initialize
    reset
end

Instance Attribute Details

#done_portsObject (readonly)

Returns the value of attribute done_ports



30
31
32
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 30

def done_ports
  @done_ports
end

#missing_portsObject (readonly)

Returns the value of attribute missing_ports



28
29
30
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 28

def missing_ports
  @missing_ports
end

#resultObject (readonly)

Returns the value of attribute result



22
23
24
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 22

def result
  @result
end

#triggering_connectionsObject (readonly)

Returns the value of attribute triggering_connections



24
25
26
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 24

def triggering_connections
  @triggering_connections
end

#triggering_dependenciesObject (readonly)

Returns the value of attribute triggering_dependencies



26
27
28
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 26

def triggering_dependencies
  @triggering_dependencies
end

Instance Method Details

#add_port_info(task, port_name, info) ⇒ Object

Register information about the given task's port.

If some information is already available, merge the new info object with what exists. Use #set_port_info to reset the current information with the new object.



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 306

def add_port_info(task, port_name, info)
    if done_ports[task].include?(port_name)
        done_at = @done_at[[task, port_name]] if @done_at
        raise ModifyingFinalizedPortInfo.new(task, port_name, done_at, self.class.name), "trying to change port information for #{task}.#{port_name} after done_port_info has been called"
    end

    if !has_information_for_port?(task, port_name)
        @changed = true
        @result[task][port_name] = info
    else
        begin
            @changed = @result[task][port_name].merge(info)
        rescue Exception => e
            raise e, "while adding information to port #{port_name} on #{task}, #{e.message}", e.backtrace
        end
    end
end

#apply_merges(merge_solver) ⇒ Object

Maps the tasks stored in the dataflow dynamics information to the ones that merge_solver is pointing to



441
442
443
444
445
446
447
448
449
450
451
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 441

def apply_merges(merge_solver)
    @result = result.map_key do |task, _|
        merge_solver.replacement_for(task)
    end
    @missing_ports = missing_ports.map_key do |task, _|
        merge_solver.replacement_for(task)
    end
    @done_ports = done_ports.map_key do |task, _|
        merge_solver.replacement_for(task)
    end
end

#done_port_info(task, port_name) ⇒ Object

Called when all information on task.port_name has been added



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 337

def done_port_info(task, port_name)
    if !done_ports[task].include?(port_name)
        @changed = true

        if has_information_for_port?(task, port_name)
            if port_info(task, port_name).empty?
                remove_port_info(task, port_name)
            end
        end

        done_ports[task] << port_name
        if missing_ports.has_key?(task)
            missing_ports[task].delete(port_name)
            if missing_ports[task].empty?
                missing_ports.delete(task)
            end
        end
    end

    debug do
        debug "done computing information for #{task}.#{port_name}"
        log_nest(4) do
            if has_information_for_port?(task, port_name)
                log_pp(:debug, port_info(task, port_name))
            else
                debug "no stored information"
            end
        end
        @done_at ||= Hash.new
        @done_at[[task, port_name]] = caller
        break
    end
end

#has_final_information_for_port?(task, port_name) ⇒ Boolean

Returns:

  • (Boolean)


44
45
46
47
48
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 44

def has_final_information_for_port?(task, port_name)
    done_ports.has_key?(task) &&
        done_ports[task].include?(port_name) &&
        has_information_for_port?(task, port_name)
end

#has_information_for_port?(task, port_name) ⇒ Boolean

Returns:

  • (Boolean)


39
40
41
42
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 39

def has_information_for_port?(task, port_name)
    result.has_key?(task) &&
        result[task].has_key?(port_name)
end

#initial_information(task) ⇒ Object

Registers information about task that is independent of the connection graph, to seed the algorithm

The information must be added using #add_port_info

Raises:

  • (NotImplementedError)


375
376
377
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 375

def initial_information(task)
    raise NotImplementedError
end

#port_info(task, port_name) ⇒ Object

Returns the port information stored for the given port

The return type is specific to the actual algorithm (i.e. the subclass of DataFlowComputation)

Raises:

  • ArgumentError if there are no information stored for the given port



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 66

def port_info(task, port_name)
    if result.has_key?(task)
        if result[task].has_key?(port_name)
            return result[task][port_name]
        end
    end
    if port_name
        raise ArgumentError, "no information currently available for #{task.orocos_name}.#{port_name}"
    else
        raise ArgumentError, "no information currently available for #{task.orocos_name}"
    end
end

#propagate(tasks) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 164

def propagate(tasks)
    reset(tasks)

    debug do
        debug "#{self.class}: computing on #{tasks.size} tasks"
        tasks.each do |t|
            debug "  #{t}"
        end
        break
    end

    # Compute the set of ports for which information is required.
    # This is called before #initial_information, so that
    # #initial_information can add the required information if it is
    # available
    @missing_ports = required_information(tasks)
    if !@missing_ports.kind_of?(Hash)
        raise ArgumentError, "#required_information is supposed to return a Hash, but returned #{@missing_ports}"
    end

    debug ""
    debug "== Gathering Initial Information"
    tasks.each do |task|
        debug { "computing initial information for #{task}" }

        log_nest(4) do
            initial_information(task)
            if connections = triggering_port_connections(task)
                triggering_connections[task] = connections
                triggering_dependencies[task] = connections.map do |port_name, triggers|
                    triggers.ports.map(&:first)
                end

                debug do
                    debug "#{connections.size} triggering connections for #{task}"
                    connections.each do |port_name, info|
                        debug "    for #{port_name}"
                        log_nest(8) do
                            log_pp :debug, info
                        end
                    end
                    break
                end
            end
        end
    end

    debug ""
    debug "== Propagation"
    remaining_tasks = tasks.dup
    while !missing_ports.empty?
        remaining_tasks = remaining_tasks.
            sort_by { |t| triggering_dependencies[t].size }

        @changed = false
        remaining_tasks.delete_if do |task|
            triggering_connections[task].delete_if do |port_name, triggers|
                next if has_final_information_for_port?(task, port_name)

                to_propagate, complete = triggers.ports_to_propagate(self)
                debug do
                    if to_propagate.empty?
                        debug { "nothing to propagate to #{task}.#{port_name}" }
                        debug { "    complete: #{complete}" }
                    else
                        debug { "propagating information to #{task}.#{port_name}" }
                        debug { "    complete: #{complete}" }
                        to_propagate.each do |info|
                            debug "    #{info.compact.join(".")}"
                        end
                    end
                    break
                end

                to_propagate.each do |info|
                    begin
                        add_port_info(task, port_name, port_info(*info))
                    rescue Exception => e
                        raise DataflowPropagationError.new(e, task, port_name), "while propagating information from port #{info} to #{port_name} on #{task}, #{e.message}"
                    end
                end
                if complete
                    done_port_info(task, port_name)
                    true
                else
                    false
                end
            end

            propagate_task(task)
        end

        if !@changed
            break
        end
    end

    if !missing_ports.empty?
        debug do
            debug "found fixed point, breaking out of propagation loop with #{missing_ports.size} missing ports"
            debug "removing partial port information"
            break
        end
        result.delete_if do |task, port_info|
            port_info.delete_if do |port, info|
                if info.empty?
                    debug do
                        debug "  #{task}.#{port} (empty)"
                        break
                    end
                    true

                elsif !has_final_information_for_port?(task, port)
                    debug do
                        debug "  #{task}.#{port} (not finalized)"
                        break
                    end
                    true
                end
            end
            port_info.empty?
        end
    else
        debug "done computing all required port information"
    end

    result
end

#propagate_task(task) ⇒ Object

Propagate information on task. Returns true if all information that can be computed has been (i.e. if calling #propagate_task on the same task again will never add new information)

Raises:

  • (NotImplementedError)


435
436
437
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 435

def propagate_task(task)
    raise NotImplementedError
end

#remove_port_info(task, port_name) ⇒ Object

Deletes all available information about the specified port



325
326
327
328
329
330
331
332
333
334
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 325

def remove_port_info(task, port_name)
    if !@result.has_key?(task)
        return
    end
    task_info = @result[task]
    task_info.delete(port_name)
    if task_info.empty?
        @result.delete(task)
    end
end

#required_information(tasks) ⇒ Object

Returns the set of objects for which information is required as an output of the algorithm

The returned value is a map:

task => ports

Where ports is the set of port names that are required on task. nil can be used to denote the task itself.

Raises:

  • (NotImplementedError)


428
429
430
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 428

def required_information(tasks)
    raise NotImplementedError
end

#reset(tasks = Array.new) ⇒ Object



153
154
155
156
157
158
159
160
161
162
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 153

def reset(tasks = Array.new)
    @result = Hash.new { |h, k| h[k] = Hash.new }
    # Internal variable that is used to detect whether an iteration
    # added information
    @changed = false
    @done_ports = Hash.new { |h, k| h[k] = Set.new }
    @triggering_connections  = Hash.new { |h, k| h[k] = Hash.new }
    @triggering_dependencies = Hash.new { |h, k| h[k] = Set.new }
    @missing_ports = Hash.new
end

#set_port_info(task, port_name, info) ⇒ Object



293
294
295
296
297
298
299
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 293

def set_port_info(task, port_name, info)
    if !has_information_for_port?(task, port_name)
        add_port_info(task, port_name, info)
    else
        @result[task][port_name] = info
    end
end

#task_info(task) ⇒ Object

Returns the task information stored for the given task

The return type is specific to the actual algorithm (i.e. the subclass of DataFlowComputation)

Raises:

  • ArgumentError if there are no information stored for the given task



56
57
58
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 56

def task_info(task)
    port_info(task, nil)
end

#triggering_inputs(task) ⇒ Object

Returns the list of input ports in task that should trigger a recomputation of the information for task

Raises:

  • (NotImplementedError)


415
416
417
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 415

def triggering_inputs(task)
    raise NotImplementedError
end

#triggering_port_connections(task) ⇒ Object

Returns the list of ports whose information can be propagated to a port in task

The returned value is a hash of the form

port_name => [Set([other_task, other_port_name]), boolean]

where port_name is a port in task and the set is a set of ports whose information can be propagated to add information on port_name.

If the boolean is false, the information will be propagated only if all the listed ports have information. Otherwise, it will be as soon as one has some information

The default implementation calls a method triggering_inputs that simply returns a list of ports in task whose connections are triggering.



397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 397

def triggering_port_connections(task)
    result = Hash.new
    connections = Set.new

    triggering_inputs(task).each do |port|
        task.each_concrete_input_connection(port.name) do |from_task, from_port, to_port, _|
            connections << [from_task, from_port]
        end
        if !connections.empty?
            result[port.name] = Trigger.new(connections, Trigger::USE_ALL)
            connections = Set.new
        end
    end
    result
end