Class: Syskit::NetworkGeneration::DataFlowComputation
- Extended by:
- Logger::Hierarchy
- Includes:
- Logger::Hierarchy
- Defined in:
- lib/syskit/network_generation/dataflow_computation.rb
Overview
This class embeds the basic handling of computations that need to follow the dataflow
It provides an interface that allows to propagate information along the dataflow
Requirements on the information type:
-
empty?
-
merge(value)
Subclasses need to redefine the following methods (go to the method documentation for more information)
-
initial_information(task)
-
required_information(tasks)
-
triggering_inputs(task)
-
propagate_task(task)
Direct Known Subclasses
Defined Under Namespace
Classes: Trigger
Instance Attribute Summary collapse
-
#done_ports ⇒ Object
readonly
Returns the value of attribute done_ports.
-
#missing_ports ⇒ Object
readonly
Returns the value of attribute missing_ports.
-
#result ⇒ Object
readonly
Returns the value of attribute result.
-
#triggering_connections ⇒ Object
readonly
Returns the value of attribute triggering_connections.
-
#triggering_dependencies ⇒ Object
readonly
Returns the value of attribute triggering_dependencies.
Instance Method Summary collapse
-
#add_port_info(task, port_name, info) ⇒ Object
Register information about the given task's port.
-
#apply_merges(merge_solver) ⇒ Object
Maps the tasks stored in the dataflow dynamics information to the ones that
merge_solver
is pointing to. -
#done_port_info(task, port_name) ⇒ Object
Called when all information on
task
.port_name
has been added. - #has_final_information_for_port?(task, port_name) ⇒ Boolean
- #has_information_for_port?(task, port_name) ⇒ Boolean
-
#initial_information(task) ⇒ Object
Registers information about
task
that is independent of the connection graph, to seed the algorithm. -
#initialize ⇒ DataFlowComputation
constructor
A new instance of DataFlowComputation.
-
#port_info(task, port_name) ⇒ Object
Returns the port information stored for the given port.
- #propagate(tasks) ⇒ Object
-
#propagate_task(task) ⇒ Object
Propagate information on
task
. -
#remove_port_info(task, port_name) ⇒ Object
Deletes all available information about the specified port.
-
#required_information(tasks) ⇒ Object
Returns the set of objects for which information is required as an output of the algorithm.
- #reset(tasks = Array.new) ⇒ Object
- #set_port_info(task, port_name, info) ⇒ Object
-
#task_info(task) ⇒ Object
Returns the task information stored for the given task.
-
#triggering_inputs(task) ⇒ Object
Returns the list of input ports in
task
that should trigger a recomputation of the information fortask
. -
#triggering_port_connections(task) ⇒ Object
Returns the list of ports whose information can be propagated to a port in
task
.
Constructor Details
#initialize ⇒ DataFlowComputation
Returns a new instance of DataFlowComputation
35 36 37 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 35 def initialize reset end |
Instance Attribute Details
#done_ports ⇒ Object (readonly)
Returns the value of attribute done_ports
30 31 32 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 30 def done_ports @done_ports end |
#missing_ports ⇒ Object (readonly)
Returns the value of attribute missing_ports
28 29 30 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 28 def missing_ports @missing_ports end |
#result ⇒ Object (readonly)
Returns the value of attribute result
22 23 24 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 22 def result @result end |
#triggering_connections ⇒ Object (readonly)
Returns the value of attribute triggering_connections
24 25 26 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 24 def triggering_connections @triggering_connections end |
#triggering_dependencies ⇒ Object (readonly)
Returns the value of attribute triggering_dependencies
26 27 28 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 26 def triggering_dependencies @triggering_dependencies end |
Instance Method Details
#add_port_info(task, port_name, info) ⇒ Object
Register information about the given task's port.
If some information is already available, merge the new info
object with what exists. Use #set_port_info to reset the current
information with the new object.
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 306 def add_port_info(task, port_name, info) if done_ports[task].include?(port_name) done_at = @done_at[[task, port_name]] if @done_at raise ModifyingFinalizedPortInfo.new(task, port_name, done_at, self.class.name), "trying to change port information for #{task}.#{port_name} after done_port_info has been called" end if !has_information_for_port?(task, port_name) @changed = true @result[task][port_name] = info else begin @changed = @result[task][port_name].merge(info) rescue Exception => e raise e, "while adding information to port #{port_name} on #{task}, #{e.}", e.backtrace end end end |
#apply_merges(merge_solver) ⇒ Object
Maps the tasks stored in the dataflow dynamics information to the ones that
merge_solver
is pointing to
441 442 443 444 445 446 447 448 449 450 451 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 441 def apply_merges(merge_solver) @result = result.map_key do |task, _| merge_solver.replacement_for(task) end @missing_ports = missing_ports.map_key do |task, _| merge_solver.replacement_for(task) end @done_ports = done_ports.map_key do |task, _| merge_solver.replacement_for(task) end end |
#done_port_info(task, port_name) ⇒ Object
Called when all information on task
.port_name
has
been added
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 337 def done_port_info(task, port_name) if !done_ports[task].include?(port_name) @changed = true if has_information_for_port?(task, port_name) if port_info(task, port_name).empty? remove_port_info(task, port_name) end end done_ports[task] << port_name if missing_ports.has_key?(task) missing_ports[task].delete(port_name) if missing_ports[task].empty? missing_ports.delete(task) end end end debug do debug "done computing information for #{task}.#{port_name}" log_nest(4) do if has_information_for_port?(task, port_name) log_pp(:debug, port_info(task, port_name)) else debug "no stored information" end end @done_at ||= Hash.new @done_at[[task, port_name]] = caller break end end |
#has_final_information_for_port?(task, port_name) ⇒ Boolean
44 45 46 47 48 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 44 def has_final_information_for_port?(task, port_name) done_ports.has_key?(task) && done_ports[task].include?(port_name) && has_information_for_port?(task, port_name) end |
#has_information_for_port?(task, port_name) ⇒ Boolean
39 40 41 42 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 39 def has_information_for_port?(task, port_name) result.has_key?(task) && result[task].has_key?(port_name) end |
#initial_information(task) ⇒ Object
Registers information about task
that is independent of the
connection graph, to seed the algorithm
The information must be added using #add_port_info
375 376 377 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 375 def initial_information(task) raise NotImplementedError end |
#port_info(task, port_name) ⇒ Object
Returns the port information stored for the given port
The return type is specific to the actual algorithm (i.e. the subclass of DataFlowComputation)
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 66 def port_info(task, port_name) if result.has_key?(task) if result[task].has_key?(port_name) return result[task][port_name] end end if port_name raise ArgumentError, "no information currently available for #{task.orocos_name}.#{port_name}" else raise ArgumentError, "no information currently available for #{task.orocos_name}" end end |
#propagate(tasks) ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 164 def propagate(tasks) reset(tasks) debug do debug "#{self.class}: computing on #{tasks.size} tasks" tasks.each do |t| debug " #{t}" end break end # Compute the set of ports for which information is required. # This is called before #initial_information, so that # #initial_information can add the required information if it is # available @missing_ports = required_information(tasks) if !@missing_ports.kind_of?(Hash) raise ArgumentError, "#required_information is supposed to return a Hash, but returned #{@missing_ports}" end debug "" debug "== Gathering Initial Information" tasks.each do |task| debug { "computing initial information for #{task}" } log_nest(4) do initial_information(task) if connections = triggering_port_connections(task) triggering_connections[task] = connections triggering_dependencies[task] = connections.map do |port_name, triggers| triggers.ports.map(&:first) end debug do debug "#{connections.size} triggering connections for #{task}" connections.each do |port_name, info| debug " for #{port_name}" log_nest(8) do log_pp :debug, info end end break end end end end debug "" debug "== Propagation" remaining_tasks = tasks.dup while !missing_ports.empty? remaining_tasks = remaining_tasks. sort_by { |t| triggering_dependencies[t].size } @changed = false remaining_tasks.delete_if do |task| triggering_connections[task].delete_if do |port_name, triggers| next if has_final_information_for_port?(task, port_name) to_propagate, complete = triggers.ports_to_propagate(self) debug do if to_propagate.empty? debug { "nothing to propagate to #{task}.#{port_name}" } debug { " complete: #{complete}" } else debug { "propagating information to #{task}.#{port_name}" } debug { " complete: #{complete}" } to_propagate.each do |info| debug " #{info.compact.join(".")}" end end break end to_propagate.each do |info| begin add_port_info(task, port_name, port_info(*info)) rescue Exception => e raise DataflowPropagationError.new(e, task, port_name), "while propagating information from port #{info} to #{port_name} on #{task}, #{e.}" end end if complete done_port_info(task, port_name) true else false end end propagate_task(task) end if !@changed break end end if !missing_ports.empty? debug do debug "found fixed point, breaking out of propagation loop with #{missing_ports.size} missing ports" debug "removing partial port information" break end result.delete_if do |task, port_info| port_info.delete_if do |port, info| if info.empty? debug do debug " #{task}.#{port} (empty)" break end true elsif !has_final_information_for_port?(task, port) debug do debug " #{task}.#{port} (not finalized)" break end true end end port_info.empty? end else debug "done computing all required port information" end result end |
#propagate_task(task) ⇒ Object
Propagate information on task
. Returns true if all information
that can be computed has been (i.e. if calling #propagate_task on the same
task again will never add new information)
435 436 437 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 435 def propagate_task(task) raise NotImplementedError end |
#remove_port_info(task, port_name) ⇒ Object
Deletes all available information about the specified port
325 326 327 328 329 330 331 332 333 334 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 325 def remove_port_info(task, port_name) if !@result.has_key?(task) return end task_info = @result[task] task_info.delete(port_name) if task_info.empty? @result.delete(task) end end |
#required_information(tasks) ⇒ Object
Returns the set of objects for which information is required as an output of the algorithm
The returned value is a map:
task => ports
Where ports
is the set of port names that are required on
task
. nil
can be used to denote the task itself.
428 429 430 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 428 def required_information(tasks) raise NotImplementedError end |
#reset(tasks = Array.new) ⇒ Object
153 154 155 156 157 158 159 160 161 162 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 153 def reset(tasks = Array.new) @result = Hash.new { |h, k| h[k] = Hash.new } # Internal variable that is used to detect whether an iteration # added information @changed = false @done_ports = Hash.new { |h, k| h[k] = Set.new } @triggering_connections = Hash.new { |h, k| h[k] = Hash.new } @triggering_dependencies = Hash.new { |h, k| h[k] = Set.new } @missing_ports = Hash.new end |
#set_port_info(task, port_name, info) ⇒ Object
293 294 295 296 297 298 299 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 293 def set_port_info(task, port_name, info) if !has_information_for_port?(task, port_name) add_port_info(task, port_name, info) else @result[task][port_name] = info end end |
#task_info(task) ⇒ Object
Returns the task information stored for the given task
The return type is specific to the actual algorithm (i.e. the subclass of DataFlowComputation)
56 57 58 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 56 def task_info(task) port_info(task, nil) end |
#triggering_inputs(task) ⇒ Object
Returns the list of input ports in task
that should trigger a
recomputation of the information for task
415 416 417 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 415 def triggering_inputs(task) raise NotImplementedError end |
#triggering_port_connections(task) ⇒ Object
Returns the list of ports whose information can be propagated to a port in
task
The returned value is a hash of the form
port_name => [Set([other_task, other_port_name]), boolean]
where port_name
is a port in task
and the set is
a set of ports whose information can be propagated to add information on
port_name
.
If the boolean is false, the information will be propagated only if all the listed ports have information. Otherwise, it will be as soon as one has some information
The default implementation calls a method triggering_inputs
that simply returns a list of ports in task
whose connections
are triggering.
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/syskit/network_generation/dataflow_computation.rb', line 397 def triggering_port_connections(task) result = Hash.new connections = Set.new triggering_inputs(task).each do |port| task.each_concrete_input_connection(port.name) do |from_task, from_port, to_port, _| connections << [from_task, from_port] end if !connections.empty? result[port.name] = Trigger.new(connections, Trigger::USE_ALL) connections = Set.new end end result end |