Class: Orocos::Async::TaskContextProxy

Inherits:
ObjectBase
  • Object
show all
Extended by:
Forwardable
Includes:
Namespace
Defined in:
lib/orocos/async/task_context_proxy.rb

Constant Summary

Constants included from Namespace

Namespace::DELIMATOR

Instance Attribute Summary collapse

Attributes inherited from ObjectBase

#emitting, #event_loop, #options, #pending_adds

Instance Method Summary collapse

Methods included from Namespace

#map_to_namespace, #namespace, #namespace=, #same_namespace?, #split_name, split_name, validate_namespace_name, #verify_same_namespace

Methods inherited from ObjectBase

#add_listener, define_event, define_events, #disable_emitting, #event, event_names, #event_names, #invalidate_delegator!, #listener?, #listeners, #number_of_listeners, #on_event, #proxy_event, #really_add_listener, #remove_all_listeners, #remove_listener, #remove_proxy_event, #valid_delegator?, valid_event?, #valid_event?, validate_event, #validate_event, #wait

Constructor Details

#initialize(name, options = Hash.new) ⇒ TaskContextProxy

Returns a new instance of TaskContextProxy



463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# File 'lib/orocos/async/task_context_proxy.rb', line 463

def initialize(name,options=Hash.new)
    @options,@task_options = Kernel.filter_options options,{:name_service => Orocos::Async.name_service,
                                               :event_loop => Orocos::Async.event_loop,
                                               :reconnect => true,
                                               :retry_period => Orocos::Async::TaskContextBase.default_period,
                                               :use => nil,
                                               :raise => false,
                                               :wait => nil }

    @name_service = @options[:name_service]
    self.namespace,name = split_name(name)
    self.namespace ||= @name_service.namespace
    super(name,@options[:event_loop])

    @task_options[:event_loop] = @event_loop
    @mutex = Mutex.new
    @ports = Hash.new
    @attributes = Hash.new
    @properties = Hash.new
    @resolve_timer = @event_loop.async_every(@name_service.method(:get),
                                             {:period => @options[:retry_period],:start => false},
                                             self.name,@task_options) do |task_context,error|
        if error
            case error
            when Orocos::NotFound, Orocos::ComError
                raise error if @options[:raise]
                :ignore_error
            else
                raise error
            end
        else
            @resolve_timer.stop
            if !task_context.respond_to?(:event_loop)
                raise "TaskProxy is using a name service#{@name_service} which is returning #{task_context.class} but Async::TaskContext was expected."
            end
            @event_loop.async_with_options(method(:reachable!),{:sync_key => self,:known_errors => Orocos::Async::KNOWN_ERRORS},task_context) do |val,error|
                if error
                    @resolve_timer.start
                    :ignore_error
                end
            end
        end
    end

    on_port_reachable(false) do |name|
        p = @ports[name]
        if p && !p.reachable?
            error_callback = Proc.new do |error|
                p.emit_error(error)
            end
            @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do
                connect_port(p)
            end
        end
    end
    on_property_reachable(false) do |name|
        p = @properties[name]
        if(p && !p.reachable?)
            error_callback = Proc.new do |error|
                p.emit_error(error)
            end
            @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do
                connect_property(p)
            end
        end
    end
    on_attribute_reachable(false) do |name|
        a = @attributes[name]
        if(a && !a.reachable?)
            error_callback = Proc.new do |error|
                a.emit_error(error)
            end
            @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do
                connect_attribute(a)
            end
        end
    end

    @resolve_timer.doc = "#{name} reconnect"
    if @options.has_key?(:use)
        reachable!(@options[:use])
    else
        reconnect(@options[:wait])
    end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(m, *args) ⇒ Object (private)



934
935
936
937
938
939
940
941
942
# File 'lib/orocos/async/task_context_proxy.rb', line 934

def method_missing(m,*args)
    if respond_to_missing?(m)
        event_loop.sync(@delegator_obj,args) do |args|
            @delegator_obj.method(m).call(*args)
        end
    else
        super
    end
end

Instance Attribute Details

#name_serviceObject (readonly)

Returns the value of attribute name_service



446
447
448
# File 'lib/orocos/async/task_context_proxy.rb', line 446

def name_service
  @name_service
end

Instance Method Details

#attribute(name, options = Hash.new) ⇒ Object



604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
# File 'lib/orocos/async/task_context_proxy.rb', line 604

def attribute(name,options = Hash.new)
    name = name.to_str
    options,other_options = Kernel.filter_options options,:wait => @options[:wait]
    wait if options[:wait]

    a = @mutex.synchronize do
        @attributes[name] ||= AttributeProxy.new(self,name,other_options)
    end

    if other_options.has_key?(:type) && a.type? && other_options[:type] == a.type
        other_options.delete(:type)
    end
    if !other_options.empty? && a.options != other_options
        Orocos.warn "Attribute #{a.full_name}: is already initialized with options: #{a.options}"
        Orocos.warn "ignoring options: #{other_options}"
    end

    return a if !reachable? || a.reachable?
    if options[:wait]
        connect_attribute(a)
        a.wait
    else
        @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS do
            connect_attribute(a)
        end
    end
    a
end

#attributes(&block) ⇒ Object



706
707
708
709
710
711
712
713
714
715
# File 'lib/orocos/async/task_context_proxy.rb', line 706

def attributes(&block)
   p = proc do |names|
       names.map{|name| attribute(name)}
   end
   if block
       attribute_names(&p)
   else
       p.call(attribute_names)
   end
end

#basenameObject



553
554
555
# File 'lib/orocos/async/task_context_proxy.rb', line 553

def basename
    @name
end

#each_attribute(&block) ⇒ Object

call-seq:

task.each_attribute { |a| ... } => task

Enumerates the attributes that are available on this task, as instances of Orocos::Attribute



737
738
739
740
741
742
743
744
745
746
# File 'lib/orocos/async/task_context_proxy.rb', line 737

def each_attribute(&block)
    if !block_given?
        return enum_for(:each_attribute)
    end

    names = attribute_names
    names.each do |name|
        yield(attribute(name))
    end
end

#each_port(&block) ⇒ Object

call-seq:

task.each_port { |p| ... } => task

Enumerates the ports that are available on this task, as instances of either Orocos::InputPort or Orocos::OutputPort



753
754
755
756
757
758
759
760
761
762
# File 'lib/orocos/async/task_context_proxy.rb', line 753

def each_port(&block)
    if !block_given?
        return enum_for(:each_port)
    end

    port_names.each do |name|
        yield(port(name))
    end
    self
end

#each_property(&block) ⇒ Object

call-seq:

task.each_property { |a| ... } => task

Enumerates the properties that are available on this task, as instances of Orocos::Attribute



722
723
724
725
726
727
728
729
730
# File 'lib/orocos/async/task_context_proxy.rb', line 722

def each_property(&block)
    if !block_given?
        return enum_for(:each_property)
    end
    names = property_names
    names.each do |name|
        yield(property(name))
    end
end

#nameObject



549
550
551
# File 'lib/orocos/async/task_context_proxy.rb', line 549

def name
    map_to_namespace(@name)
end

#port(name, options = Hash.new) ⇒ Object



633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
# File 'lib/orocos/async/task_context_proxy.rb', line 633

def port(name,options = Hash.new)
    name = name.to_str
    options,other_options = Kernel.filter_options options,:wait => @options[:wait]
    wait if options[:wait]

    # support for subports
    fields = name.split(".")
    name = if fields.empty?
               name
           elsif name[0] == "/"
               # special case for log ports like: logger_name.port("/task_name.port_name")
               fields = []
               name
           else
               fields.shift
           end
    type = if !fields.empty?
               other_options.delete(:type)
           else
               nil
           end

    p = @mutex.synchronize do
        @ports[name] ||= PortProxy.new(self,name,other_options)
    end

    if other_options.has_key?(:type) && p.type? && other_options[:type] == p.type
        other_options.delete(:type)
    end
    if !other_options.empty? && p.options != other_options
        Orocos.warn "Port #{p.full_name}: is already initialized with options: #{p.options}"
        Orocos.warn "ignoring options: #{other_options}"
    end

    if reachable? && !p.reachable?
        if options[:wait]
            connect_port(p)
            p.wait
        else
            @event_loop.defer :known_errors => KNOWN_ERRORS do
                connect_port(p)
            end
        end
    end
    if fields.empty?
        p
    else
        p.sub_port(fields)
    end
end

#ports(options = Hash.new, &block) ⇒ Object



684
685
686
687
688
689
690
691
692
693
# File 'lib/orocos/async/task_context_proxy.rb', line 684

def ports(options = Hash.new,&block)
   p = proc do |names|
       names.map{|name| port(name,options)}
   end
   if block
       port_names(&p)
   else
       p.call(port_names)
   end
end

#properties(&block) ⇒ Object



695
696
697
698
699
700
701
702
703
704
# File 'lib/orocos/async/task_context_proxy.rb', line 695

def properties(&block)
   p = proc do |names|
       names.map{|name| property(name)}
   end
   if block
       property_names(&p)
   else
       p.call(property_names)
   end
end

#property(name, options = Hash.new) ⇒ Object



575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
# File 'lib/orocos/async/task_context_proxy.rb', line 575

def property(name,options = Hash.new)
    name = name.to_str
    options,other_options = Kernel.filter_options options,:wait => @options[:wait]
    wait if options[:wait]

    p = @mutex.synchronize do
        @properties[name] ||= PropertyProxy.new(self,name,other_options)
    end

    if other_options.has_key?(:type) && p.type? && other_options[:type] == p.type
        other_options.delete(:type)
    end
    if !other_options.empty? && p.options != other_options
        Orocos.warn "Property #{p.full_name}: is already initialized with options: #{p.options}"
        Orocos.warn "ignoring options: #{other_options}"
    end

    return p if !reachable? || p.reachable?
    if options[:wait]
        connect_property(p)
        p.wait
    else
        @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS do
            connect_property(p)
        end
    end
    p
end

#reachable!(task_context, options = Hash.new) ⇒ Object

must be thread safe

Raises:

  • (ArgumentError)


765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
# File 'lib/orocos/async/task_context_proxy.rb', line 765

def reachable!(task_context,options = Hash.new)
    raise ArgumentError, "task_context must not be instance of TaskContextProxy" if task_context.is_a?(TaskContextProxy)
    raise ArgumentError, "task_context must be an async instance but is #{task_context.class}" if !task_context.respond_to?(:event_names)
    @mutex.synchronize do
        @last_task_class ||= task_context.class
        if @last_task_class != task_context.class
            Vizkit.warn "Class missmatch: TaskContextProxy #{name} was recently connected to #{@last_task_class} and is now connected to #{task_context.class}."
            @last_task_class = task_context.class
        end

        remove_proxy_event(@delegator_obj,@delegator_obj.event_names) if valid_delegator?
        if @delegator_obj_old
            remove_proxy_event(@delegator_obj_old,@delegator_obj_old.event_names)
            @delegator_obj_old = nil
        end
        super(task_context,options)

        # check if the requested ports are available
        @ports.values.each do |port|
            unless task_context.port_names.include? port.name
                Orocos.warn "task #{name} has currently no port called #{port.name} - on_data will be called when the port was added"
            end
        end
        @attributes.values.each do |attribute|
            unless task_context.attribute_names.include? attribute.name
                Orocos.warn "task #{name} has currently no attribute called #{attribute.name} - on_change will be called when the attribute was added"
            end
        end
        @properties.values.each do |property|
            unless task_context.property_names.include? property.name
                Orocos.warn "task #{name} has currently no property called #{property.name} - on_change will be called when the property was added"
            end
        end

        # this is emitting on_port_reachable, on_property_reachable ....
        proxy_event(@delegator_obj,@delegator_obj.event_names-[:reachable])
    end
end

#reachable?Boolean

Returns:

  • (Boolean)


804
805
806
807
808
809
810
811
# File 'lib/orocos/async/task_context_proxy.rb', line 804

def reachable?
    @mutex.synchronize do
        super && @delegator_obj.reachable?
    end
rescue Orocos::NotFound => e
    unreachable! :error => e,:reconnect => @options[:reconnect]
    false
end

#reconnect(wait_for_task = false) ⇒ Object

asychronsosly tries to connect to the remote task



570
571
572
573
# File 'lib/orocos/async/task_context_proxy.rb', line 570

def reconnect(wait_for_task = false)
    @resolve_timer.start options[:retry_period]
    wait if wait_for_task == true
end

#to_async(options = Hash.new) ⇒ Object



557
558
559
# File 'lib/orocos/async/task_context_proxy.rb', line 557

def to_async(options=Hash.new)
    Orocos::Async.get(name,options)
end

#to_proxy(options = Hash.new) ⇒ Object



561
562
563
# File 'lib/orocos/async/task_context_proxy.rb', line 561

def to_proxy(options=Hash.new)
    self
end

#to_rubyObject



565
566
567
# File 'lib/orocos/async/task_context_proxy.rb', line 565

def to_ruby
    TaskContextBase::to_ruby(self)
end

#unreachable!(options = {:reconnect => false}) ⇒ Object



813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
# File 'lib/orocos/async/task_context_proxy.rb', line 813

def unreachable!(options = {:reconnect => false})
    Kernel.validate_options options,:reconnect,:error
    @mutex.synchronize do
        # do not stop proxing events here (see reachable!)
        # otherwise unrechable event might get lost
        @delegator_obj_old = if valid_delegator?
                                 @delegator_obj
                             else
                                 @delegator_obj_old
                             end

        disable_emitting do
            super(options)
        end
    end
    disconnect_ports
    disconnect_attributes
    disconnect_properties
    re = if options.has_key?(:reconnect)
            options[:reconnect]
         else
            @options[:reconnect]
         end
    reconnect if re
end