Class: Orocos::Async::TaskContextBase

Inherits:
ObjectBase
  • Object
show all
Extended by:
ObjectBase::Periodic::ClassMethods, Utilrb::EventLoop::Forwardable
Includes:
ObjectBase::Periodic
Defined in:
lib/orocos/async/task_context_base.rb

Direct Known Subclasses

CORBA::TaskContext, ROS::Node

Instance Attribute Summary collapse

Attributes included from ObjectBase::Periodic::ClassMethods

#default_period

Attributes inherited from ObjectBase

#emitting, #event_loop, #options, #pending_adds

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ObjectBase::Periodic

#default_period, #period, #period=

Methods inherited from ObjectBase

#add_listener, define_event, define_events, #disable_emitting, #event, event_names, #event_names, #invalidate_delegator!, #listener?, #listeners, #number_of_listeners, #on_event, #proxy_event, #remove_all_listeners, #remove_listener, #remove_proxy_event, #valid_delegator?, valid_event?, #valid_event?, validate_event, #validate_event, #wait

Constructor Details

#initialize(name, options = Hash.new) ⇒ TaskContextBase

Returns a new instance of TaskContextBase



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/orocos/async/task_context_base.rb', line 60

def initialize(name,options=Hash.new)
    event_loop,reachable_options = Kernel.filter_options options,:event_loop => Orocos::Async.event_loop
    super(name,event_loop[:event_loop])
    @mutex = Mutex.new
    @last_state = nil
    @port_names = Array.new
    @property_names = Array.new
    @attribute_names = Array.new

    watchdog_proc = Proc.new do
        ping # call a method which raises ComError if the connection died
        # this is used to disconnect the task by an error handler
        [states,port_names,property_names,attribute_names]
    end

    @watchdog_timer = @event_loop.async_every(watchdog_proc,{:period => default_period,
                                              :default => [[],[],[],[]],
                                              :start => false,
                                              :sync_key => nil, #is blocked by the methods call ping, states, etc
                                              :known_errors => Orocos::Async::KNOWN_ERRORS}) do |data,error|
                                                    process_states(data[0])
                                                    process_port_names(data[1])
                                                    process_property_names(data[2])
                                                    process_attribute_names(data[3])
                                              end
    @watchdog_timer.doc = name
    reachable!(reachable_options)
end

Instance Attribute Details

#raise_on_access_error?Object

If set to true, #task_context will raise whenever the access to the remote task context failed. Otherwise, the exception will be returned by the method



58
# File 'lib/orocos/async/task_context_base.rb', line 58

attr_predicate :raise_on_access_error?, true

Class Method Details

.to_ruby(task) ⇒ RubyTasks::TaskContext

Creates a RubyTasks::TaskContext on which all values passed to the given task should be mirrored



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/orocos/async/task_context_base.rb', line 21

def self.to_ruby(task)
    begin
        t = Orocos::CORBA.name_service.get(task.basename)
        raise "Cannot create ruby task for #{task.name} "\
            "because there is already a task #{t.name} "\
            "registered on the main CORBA name service."
    rescue Orocos::NotFound
    end
    t = Orocos::RubyTasks::TaskContext.new(task.basename)
    task.on_port_reachable do |port|
        next if t.has_port?(port)
        port = task.port(port)
        port.wait
        p = t.create_output_port(port.name,port.type)
        port.on_data do |data|
            p.write data
        end
    end
    task.on_property_reachable do |prop|
        next if task.has_property?(prop)
        prop = task.property(prop)
        prop.wait
        p = @ruby_task_context.create_property(prop.name,prop.type)
        p.write p.new_sample.zero!
        prop.on_change do |data|
            p.write data
        end
    end
    t.configure
    t.start
    t
end

Instance Method Details

#attribute(name, options = Hash.new, &block) ⇒ Object



294
295
296
# File 'lib/orocos/async/task_context_base.rb', line 294

def attribute(name,options = Hash.new,&block)
    call_with_async(:orig_attribute,block,options,name)
end

#call_with_async(method_name, user_callback, to_async_options, *args) ⇒ Object

Helper method to setup async calls

Asynchronous calls can be called either with a callback or without. In the first case, the callback gets called later and the method returns right aways, otherwise the method get synchronously called. For synchronization reasons, even the synchronous call goes through the event loop (since the event loop avoids reentrant calls using the sync key).

This method is the common setup for this scheme. It uses the fact that all non-async objects must provide a #to_async call to create a corresponding asynchronous-access object.

Parameters:

  • method_name (Symbol)

    the method that should be called

  • user_callback (Proc, nil)

    the user-provided callback if there is one

  • to_async_options (Hash)

    the options that should be passed to to_async

  • the (Array)

    arguments that should be forwarded to the underlying method

Returns:

  • (Object)

    in the synchronous case, the method returns the underlying method's return value. In the asynchronous case TODO



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/orocos/async/task_context_base.rb', line 273

def call_with_async(method_name,user_callback,to_async_options,*args)
    p = proc do |object,error|
        async_object = object.to_async(Hash[:use => self].merge(to_async_options))
        if user_callback
            if user_callback.arity == 2
                user_callback.call(async_object,error)
            else
                user_callback.call(async_object)
            end
        else
            async_object
        end
    end
    if user_callback
        send(method_name,*args,&p)
    else
        async_object = send(method_name,*args)
        p.call async_object,nil
    end
end

#clear_interfaceObject



232
233
234
235
236
# File 'lib/orocos/async/task_context_base.rb', line 232

def clear_interface
    process_port_names
    process_attribute_names
    process_property_names
end

#configure_delegation(configure_options = Hash.new) ⇒ Object

Called by #reachable! to do subclass-specific configuration

Parameters:

  • configure_options (Hash) (defaults to: Hash.new)

    all options passed to #reachable! that are not understood by #reachable!



200
201
# File 'lib/orocos/async/task_context_base.rb', line 200

def configure_delegation(configure_options = Hash.new)
end

#each_attribute(&block) ⇒ Object

call-seq:

task.each_attribute { |a| ... } => task

Enumerates the attributes that are available on this task, as instances of Orocos::Attribute



326
327
328
329
330
331
332
333
334
# File 'lib/orocos/async/task_context_base.rb', line 326

def each_attribute(&block)
    if !block_given?
        return enum_for(:each_attribute)
    end
    attribute_names.each do |name|
        yield(attribute(name))
    end
    self
end

#each_port(&block) ⇒ Object

call-seq:

task.each_port { |p| ... } => task

Enumerates the ports that are available on this task, as instances of either Orocos::InputPort or Orocos::OutputPort



341
342
343
344
345
346
347
348
349
# File 'lib/orocos/async/task_context_base.rb', line 341

def each_port(&block)
    if !block_given?
        return enum_for(:each_port)
    end
    port_names.each do |name|
        yield(port(name))
    end
    self
end

#each_property(&block) ⇒ Object

call-seq:

task.each_property { |a| ... } => task

Enumerates the properties that are available on this task, as instances of Orocos::Attribute



311
312
313
314
315
316
317
318
319
# File 'lib/orocos/async/task_context_base.rb', line 311

def each_property(&block)
    if !block_given?
        return enum_for(:each_property)
    end
    property_names.each do |name|
        yield(property(name))
    end
    self
end

#nameObject



146
147
148
149
150
# File 'lib/orocos/async/task_context_base.rb', line 146

def name
    @mutex.synchronize do
        @name.dup if @name
    end
end

#port(name, verify = true, options = Hash.new, &block) ⇒ Object



302
303
304
# File 'lib/orocos/async/task_context_base.rb', line 302

def port(name, verify = true,options=Hash.new, &block)
    call_with_async(:orig_port,block,options,name,verify)
end

#property(name, options = Hash.new, &block) ⇒ Object



298
299
300
# File 'lib/orocos/async/task_context_base.rb', line 298

def property(name,options = Hash.new,&block)
    call_with_async(:orig_property,block,options,name)
end

#reachable!(options = Hash.new) ⇒ Object

Initiates the binding of the underlying sychronous access object to this async object.

It can either be directly given an object, or be asked to (asynchronously) query it.

Parameters:

  • options (Hash) (defaults to: Hash.new)

    a customizable set of options

Options Hash (options):

  • watchdog (Boolean) — default: true

    if true, start a watchdog timer that monitors the availability of the task context

  • period (Float) — default: default_period

    the period for the watchdog (if enabled)

  • wait (Boolean) — default: false

    if true, reachable! will return only when the task has successfully been found

  • use (Object) — default: nil

    if set, this is the object we will use as underlying sychronous object. Otherwise, #task_context is going to be used to find it.

  • raise (Boolean) — default: false

    if set, the #task_context method will raise if the task context cannot be accessed on first try. Otherwise, it will try to access it forever until it finds it.



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/orocos/async/task_context_base.rb', line 170

def reachable!(options = Hash.new)
    @mutex.synchronize do
        options, configure_options = Kernel.filter_options options,
            :watchdog => true,
            :period => default_period,
            :wait => false,
            :use => nil,
            :raise => false

        self.raise_on_access_error = options[:raise]

        if options[:use]
            @delegator_obj = options[:use]
            @watchdog_timer.doc = @delegator_obj.name
        else
            invalidate_delegator!
        end

        configure_delegation(configure_options)

        @watchdog_timer.start(options[:period],false) if options[:watchdog]
        @event_loop.async(method(:task_context))
    end
    wait if options[:wait]
end

#reachable?(&block) ⇒ Boolean

Returns:

  • (Boolean)


238
239
240
241
242
243
244
245
246
247
248
# File 'lib/orocos/async/task_context_base.rb', line 238

def reachable?(&block)
    if block
        ping(&block)
    else
        ping
    end
    true
rescue Orocos::NotFound,Orocos::ComError => e
    unreachable!(:error => e)
    false
end

#really_add_listener(listener) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/orocos/async/task_context_base.rb', line 115

def really_add_listener(listener)
    return super unless listener.use_last_value?

    # call new listeners with the current value
    # to prevent different behaviors depending on
    # the calling order
    if listener.event == :port_reachable
        names = @port_names.dup
        event_loop.once do 
            names.each do |name|
                listener.call name
            end
        end
    elsif listener.event == :property_reachable
        names = @property_names.dup
        event_loop.once do
            names.each do |name|
                listener.call name
            end
        end
    elsif listener.event == :attribute_reachable
        names = @attribute_names.dup
        event_loop.once do
            names.each do |name|
                listener.call name
            end
        end
    end
    super
end

#ruby_task_context?Boolean

Tests whether a mirroring task has been created with ruby_task_context

Returns:

  • (Boolean)


111
112
113
# File 'lib/orocos/async/task_context_base.rb', line 111

def ruby_task_context?
    !!@ruby_task_context
end

#to_async(options = Hash.new) ⇒ Object



89
90
91
# File 'lib/orocos/async/task_context_base.rb', line 89

def to_async(options=Hash.new)
    self
end

#to_proxy(options = Hash.new) ⇒ Object



93
94
95
# File 'lib/orocos/async/task_context_base.rb', line 93

def to_proxy(options=Hash.new)
    Orocos::Async.proxy(name,options)
end

#to_rubyRubyTasks::TaskContext

Create a ruby task on which all received data is mirrored

This task context is unique, i.e. the same object will be returned by subsequent calls to this method.



103
104
105
# File 'lib/orocos/async/task_context_base.rb', line 103

def to_ruby
    @ruby_task_context ||= TaskContextBase.to_ruby(self)
end

#unreachable!(options = Hash.new) ⇒ Orocos::TaskContext, ...

Disconnectes self from the remote task context and returns its underlying object used to communicate with the remote task (designated object).

Returns nil if the TaskContext is not connected. Returns an EventLoop Event if not called from the event loop thread.

Parameters:

  • reason (Exception)

    The reason for the disconnect

Returns:



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/orocos/async/task_context_base.rb', line 211

def unreachable!(options = Hash.new)
    options = Kernel.validate_options options, :error
    # ensure that this is always called from the
    # event loop thread
    @event_loop.call do
        old_task = @mutex.synchronize do
            if valid_delegator?
                @access_error = options.delete(:error) ||
                    ArgumentError.new("cannot access the remote task context for an unknown reason")
                task = @delegator_obj
                invalidate_delegator!
                @watchdog_timer.cancel if @watchdog_timer
                task
            end
        end
        clear_interface
        event :unreachable if old_task
        old_task
    end
end