Class: Orocos::Async::TaskContextBase
- Inherits:
-
ObjectBase
- Object
- ObjectBase
- Orocos::Async::TaskContextBase
- Extended by:
- ObjectBase::Periodic::ClassMethods, Utilrb::EventLoop::Forwardable
- Includes:
- ObjectBase::Periodic
- Defined in:
- lib/orocos/async/task_context_base.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#raise_on_access_error? ⇒ Object
If set to true, #task_context will raise whenever the access to the remote task context failed.
Attributes included from ObjectBase::Periodic::ClassMethods
Attributes inherited from ObjectBase
#emitting, #event_loop, #options, #pending_adds
Class Method Summary collapse
-
.to_ruby(task) ⇒ RubyTasks::TaskContext
Creates a RubyTasks::TaskContext on which all values passed to the given task should be mirrored.
Instance Method Summary collapse
- #attribute(name, options = Hash.new, &block) ⇒ Object
-
#call_with_async(method_name, user_callback, to_async_options, *args) ⇒ Object
Helper method to setup async calls.
- #clear_interface ⇒ Object
-
#configure_delegation(configure_options = Hash.new) ⇒ Object
Called by #reachable! to do subclass-specific configuration.
-
#each_attribute(&block) ⇒ Object
call-seq: task.each_attribute { |a| … } => task.
-
#each_port(&block) ⇒ Object
call-seq: task.each_port { |p| … } => task.
-
#each_property(&block) ⇒ Object
call-seq: task.each_property { |a| … } => task.
-
#initialize(name, options = Hash.new) ⇒ TaskContextBase
constructor
A new instance of TaskContextBase.
- #name ⇒ Object
- #port(name, verify = true, options = Hash.new, &block) ⇒ Object
- #property(name, options = Hash.new, &block) ⇒ Object
-
#reachable!(options = Hash.new) ⇒ Object
Initiates the binding of the underlying sychronous access object to this async object.
- #reachable?(&block) ⇒ Boolean
- #really_add_listener(listener) ⇒ Object
-
#ruby_task_context? ⇒ Boolean
Tests whether a mirroring task has been created with ruby_task_context.
- #to_async(options = Hash.new) ⇒ Object
- #to_proxy(options = Hash.new) ⇒ Object
-
#to_ruby ⇒ RubyTasks::TaskContext
Create a ruby task on which all received data is mirrored.
-
#unreachable!(options = Hash.new) ⇒ Orocos::TaskContext, ...
Disconnectes self from the remote task context and returns its underlying object used to communicate with the remote task (designated object).
Methods included from ObjectBase::Periodic
#default_period, #period, #period=
Methods inherited from ObjectBase
#add_listener, define_event, define_events, #disable_emitting, #event, event_names, #event_names, #invalidate_delegator!, #listener?, #listeners, #number_of_listeners, #on_event, #proxy_event, #remove_all_listeners, #remove_listener, #remove_proxy_event, #valid_delegator?, valid_event?, #valid_event?, validate_event, #validate_event, #wait
Constructor Details
#initialize(name, options = Hash.new) ⇒ TaskContextBase
Returns a new instance of TaskContextBase
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/orocos/async/task_context_base.rb', line 60 def initialize(name,=Hash.new) event_loop, = Kernel. ,:event_loop => Orocos::Async.event_loop super(name,event_loop[:event_loop]) @mutex = Mutex.new @last_state = nil @port_names = Array.new @property_names = Array.new @attribute_names = Array.new watchdog_proc = Proc.new do ping # call a method which raises ComError if the connection died # this is used to disconnect the task by an error handler [states,port_names,property_names,attribute_names] end @watchdog_timer = @event_loop.async_every(watchdog_proc,{:period => default_period, :default => [[],[],[],[]], :start => false, :sync_key => nil, #is blocked by the methods call ping, states, etc :known_errors => Orocos::Async::KNOWN_ERRORS}) do |data,error| process_states(data[0]) process_port_names(data[1]) process_property_names(data[2]) process_attribute_names(data[3]) end @watchdog_timer.doc = name reachable!() end |
Instance Attribute Details
#raise_on_access_error? ⇒ Object
If set to true, #task_context will raise whenever the access to the remote task context failed. Otherwise, the exception will be returned by the method
58 |
# File 'lib/orocos/async/task_context_base.rb', line 58 attr_predicate :raise_on_access_error?, true |
Class Method Details
.to_ruby(task) ⇒ RubyTasks::TaskContext
Creates a RubyTasks::TaskContext on which all values passed to the given task should be mirrored
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/orocos/async/task_context_base.rb', line 21 def self.to_ruby(task) begin t = Orocos::CORBA.name_service.get(task.basename) raise "Cannot create ruby task for #{task.name} "\ "because there is already a task #{t.name} "\ "registered on the main CORBA name service." rescue Orocos::NotFound end t = Orocos::RubyTasks::TaskContext.new(task.basename) task.on_port_reachable do |port| next if t.has_port?(port) port = task.port(port) port.wait p = t.create_output_port(port.name,port.type) port.on_data do |data| p.write data end end task.on_property_reachable do |prop| next if task.has_property?(prop) prop = task.property(prop) prop.wait p = @ruby_task_context.create_property(prop.name,prop.type) p.write p.new_sample.zero! prop.on_change do |data| p.write data end end t.configure t.start t end |
Instance Method Details
#attribute(name, options = Hash.new, &block) ⇒ Object
294 295 296 |
# File 'lib/orocos/async/task_context_base.rb', line 294 def attribute(name, = Hash.new,&block) call_with_async(:orig_attribute,block,,name) end |
#call_with_async(method_name, user_callback, to_async_options, *args) ⇒ Object
Helper method to setup async calls
Asynchronous calls can be called either with a callback or without. In the first case, the callback gets called later and the method returns right aways, otherwise the method get synchronously called. For synchronization reasons, even the synchronous call goes through the event loop (since the event loop avoids reentrant calls using the sync key).
This method is the common setup for this scheme. It uses the fact that all non-async objects must provide a #to_async call to create a corresponding asynchronous-access object.
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/orocos/async/task_context_base.rb', line 273 def call_with_async(method_name,user_callback,,*args) p = proc do |object,error| async_object = object.to_async(Hash[:use => self].merge()) if user_callback if user_callback.arity == 2 user_callback.call(async_object,error) else user_callback.call(async_object) end else async_object end end if user_callback send(method_name,*args,&p) else async_object = send(method_name,*args) p.call async_object,nil end end |
#clear_interface ⇒ Object
232 233 234 235 236 |
# File 'lib/orocos/async/task_context_base.rb', line 232 def clear_interface process_port_names process_attribute_names process_property_names end |
#configure_delegation(configure_options = Hash.new) ⇒ Object
Called by #reachable! to do subclass-specific configuration
200 201 |
# File 'lib/orocos/async/task_context_base.rb', line 200 def configure_delegation( = Hash.new) end |
#each_attribute(&block) ⇒ Object
call-seq:
task.each_attribute { |a| ... } => task
Enumerates the attributes that are available on this task, as instances of Orocos::Attribute
326 327 328 329 330 331 332 333 334 |
# File 'lib/orocos/async/task_context_base.rb', line 326 def each_attribute(&block) if !block_given? return enum_for(:each_attribute) end attribute_names.each do |name| yield(attribute(name)) end self end |
#each_port(&block) ⇒ Object
call-seq:
task.each_port { |p| ... } => task
Enumerates the ports that are available on this task, as instances of either Orocos::InputPort or Orocos::OutputPort
341 342 343 344 345 346 347 348 349 |
# File 'lib/orocos/async/task_context_base.rb', line 341 def each_port(&block) if !block_given? return enum_for(:each_port) end port_names.each do |name| yield(port(name)) end self end |
#each_property(&block) ⇒ Object
call-seq:
task.each_property { |a| ... } => task
Enumerates the properties that are available on this task, as instances of Orocos::Attribute
311 312 313 314 315 316 317 318 319 |
# File 'lib/orocos/async/task_context_base.rb', line 311 def each_property(&block) if !block_given? return enum_for(:each_property) end property_names.each do |name| yield(property(name)) end self end |
#name ⇒ Object
146 147 148 149 150 |
# File 'lib/orocos/async/task_context_base.rb', line 146 def name @mutex.synchronize do @name.dup if @name end end |
#port(name, verify = true, options = Hash.new, &block) ⇒ Object
302 303 304 |
# File 'lib/orocos/async/task_context_base.rb', line 302 def port(name, verify = true,=Hash.new, &block) call_with_async(:orig_port,block,,name,verify) end |
#property(name, options = Hash.new, &block) ⇒ Object
298 299 300 |
# File 'lib/orocos/async/task_context_base.rb', line 298 def property(name, = Hash.new,&block) call_with_async(:orig_property,block,,name) end |
#reachable!(options = Hash.new) ⇒ Object
Initiates the binding of the underlying sychronous access object to this async object.
It can either be directly given an object, or be asked to (asynchronously) query it.
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/orocos/async/task_context_base.rb', line 170 def reachable!( = Hash.new) @mutex.synchronize do , = Kernel. , :watchdog => true, :period => default_period, :wait => false, :use => nil, :raise => false self.raise_on_access_error = [:raise] if [:use] @delegator_obj = [:use] @watchdog_timer.doc = @delegator_obj.name else invalidate_delegator! end configure_delegation() @watchdog_timer.start([:period],false) if [:watchdog] @event_loop.async(method(:task_context)) end wait if [:wait] end |
#reachable?(&block) ⇒ Boolean
238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/orocos/async/task_context_base.rb', line 238 def reachable?(&block) if block ping(&block) else ping end true rescue Orocos::NotFound,Orocos::ComError => e unreachable!(:error => e) false end |
#really_add_listener(listener) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/orocos/async/task_context_base.rb', line 115 def really_add_listener(listener) return super unless listener.use_last_value? # call new listeners with the current value # to prevent different behaviors depending on # the calling order if listener.event == :port_reachable names = @port_names.dup event_loop.once do names.each do |name| listener.call name end end elsif listener.event == :property_reachable names = @property_names.dup event_loop.once do names.each do |name| listener.call name end end elsif listener.event == :attribute_reachable names = @attribute_names.dup event_loop.once do names.each do |name| listener.call name end end end super end |
#ruby_task_context? ⇒ Boolean
Tests whether a mirroring task has been created with ruby_task_context
111 112 113 |
# File 'lib/orocos/async/task_context_base.rb', line 111 def ruby_task_context? !!@ruby_task_context end |
#to_async(options = Hash.new) ⇒ Object
89 90 91 |
# File 'lib/orocos/async/task_context_base.rb', line 89 def to_async(=Hash.new) self end |
#to_proxy(options = Hash.new) ⇒ Object
93 94 95 |
# File 'lib/orocos/async/task_context_base.rb', line 93 def to_proxy(=Hash.new) Orocos::Async.proxy(name,) end |
#to_ruby ⇒ RubyTasks::TaskContext
Create a ruby task on which all received data is mirrored
This task context is unique, i.e. the same object will be returned by subsequent calls to this method.
103 104 105 |
# File 'lib/orocos/async/task_context_base.rb', line 103 def to_ruby @ruby_task_context ||= TaskContextBase.to_ruby(self) end |
#unreachable!(options = Hash.new) ⇒ Orocos::TaskContext, ...
Disconnectes self from the remote task context and returns its underlying object used to communicate with the remote task (designated object).
Returns nil if the TaskContext is not connected. Returns an EventLoop Event if not called from the event loop thread.
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/orocos/async/task_context_base.rb', line 211 def unreachable!( = Hash.new) = Kernel. , :error # ensure that this is always called from the # event loop thread @event_loop.call do old_task = @mutex.synchronize do if valid_delegator? @access_error = .delete(:error) || ArgumentError.new("cannot access the remote task context for an unknown reason") task = @delegator_obj invalidate_delegator! @watchdog_timer.cancel if @watchdog_timer task end end clear_interface event :unreachable if old_task old_task end end |