Class: Orocos::Async::TaskContextProxy
- Inherits:
-
ObjectBase
- Object
- ObjectBase
- Orocos::Async::TaskContextProxy
- Extended by:
- Forwardable
- Includes:
- Namespace
- Defined in:
- lib/orocos/async/task_context_proxy.rb
Constant Summary
Constants included from Namespace
Instance Attribute Summary collapse
-
#name_service ⇒ Object
readonly
Returns the value of attribute name_service.
Attributes inherited from ObjectBase
#emitting, #event_loop, #options, #pending_adds
Instance Method Summary collapse
- #attribute(name, options = Hash.new) ⇒ Object
- #attributes(&block) ⇒ Object
- #basename ⇒ Object
-
#each_attribute(&block) ⇒ Object
call-seq: task.each_attribute { |a| … } => task.
-
#each_port(&block) ⇒ Object
call-seq: task.each_port { |p| … } => task.
-
#each_property(&block) ⇒ Object
call-seq: task.each_property { |a| … } => task.
-
#initialize(name, options = Hash.new) ⇒ TaskContextProxy
constructor
A new instance of TaskContextProxy.
- #name ⇒ Object
- #port(name, options = Hash.new) ⇒ Object
- #ports(options = Hash.new, &block) ⇒ Object
- #properties(&block) ⇒ Object
- #property(name, options = Hash.new) ⇒ Object
-
#reachable!(task_context, options = Hash.new) ⇒ Object
must be thread safe.
- #reachable? ⇒ Boolean
-
#reconnect(wait_for_task = false) ⇒ Object
asychronsosly tries to connect to the remote task.
- #to_async(options = Hash.new) ⇒ Object
- #to_proxy(options = Hash.new) ⇒ Object
- #to_ruby ⇒ Object
- #unreachable!(options = {:reconnect => false}) ⇒ Object
Methods included from Namespace
#map_to_namespace, #namespace, #namespace=, #same_namespace?, #split_name, split_name, validate_namespace_name, #verify_same_namespace
Methods inherited from ObjectBase
#add_listener, define_event, define_events, #disable_emitting, #event, event_names, #event_names, #invalidate_delegator!, #listener?, #listeners, #number_of_listeners, #on_event, #proxy_event, #really_add_listener, #remove_all_listeners, #remove_listener, #remove_proxy_event, #valid_delegator?, valid_event?, #valid_event?, validate_event, #validate_event, #wait
Constructor Details
#initialize(name, options = Hash.new) ⇒ TaskContextProxy
Returns a new instance of TaskContextProxy
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 |
# File 'lib/orocos/async/task_context_proxy.rb', line 463 def initialize(name,=Hash.new) @options,@task_options = Kernel. ,{:name_service => Orocos::Async.name_service, :event_loop => Orocos::Async.event_loop, :reconnect => true, :retry_period => Orocos::Async::TaskContextBase.default_period, :use => nil, :raise => false, :wait => nil } @name_service = @options[:name_service] self.namespace,name = split_name(name) self.namespace ||= @name_service.namespace super(name,@options[:event_loop]) @task_options[:event_loop] = @event_loop @mutex = Mutex.new @ports = Hash.new @attributes = Hash.new @properties = Hash.new @resolve_timer = @event_loop.async_every(@name_service.method(:get), {:period => @options[:retry_period],:start => false}, self.name,@task_options) do |task_context,error| if error case error when Orocos::NotFound, Orocos::ComError raise error if @options[:raise] :ignore_error else raise error end else @resolve_timer.stop if !task_context.respond_to?(:event_loop) raise "TaskProxy is using a name service#{@name_service} which is returning #{task_context.class} but Async::TaskContext was expected." end @event_loop.(method(:reachable!),{:sync_key => self,:known_errors => Orocos::Async::KNOWN_ERRORS},task_context) do |val,error| if error @resolve_timer.start :ignore_error end end end end on_port_reachable(false) do |name| p = @ports[name] if p && !p.reachable? error_callback = Proc.new do |error| p.emit_error(error) end @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do connect_port(p) end end end on_property_reachable(false) do |name| p = @properties[name] if(p && !p.reachable?) error_callback = Proc.new do |error| p.emit_error(error) end @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do connect_property(p) end end end on_attribute_reachable(false) do |name| a = @attributes[name] if(a && !a.reachable?) error_callback = Proc.new do |error| a.emit_error(error) end @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do connect_attribute(a) end end end @resolve_timer.doc = "#{name} reconnect" if @options.has_key?(:use) reachable!(@options[:use]) else reconnect(@options[:wait]) end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(m, *args) ⇒ Object (private)
934 935 936 937 938 939 940 941 942 |
# File 'lib/orocos/async/task_context_proxy.rb', line 934 def method_missing(m,*args) if respond_to_missing?(m) event_loop.sync(@delegator_obj,args) do |args| @delegator_obj.method(m).call(*args) end else super end end |
Instance Attribute Details
#name_service ⇒ Object (readonly)
Returns the value of attribute name_service
446 447 448 |
# File 'lib/orocos/async/task_context_proxy.rb', line 446 def name_service @name_service end |
Instance Method Details
#attribute(name, options = Hash.new) ⇒ Object
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 |
# File 'lib/orocos/async/task_context_proxy.rb', line 604 def attribute(name, = Hash.new) name = name.to_str , = Kernel. ,:wait => @options[:wait] wait if [:wait] a = @mutex.synchronize do @attributes[name] ||= AttributeProxy.new(self,name,) end if .has_key?(:type) && a.type? && [:type] == a.type .delete(:type) end if !.empty? && a. != Orocos.warn "Attribute #{a.full_name}: is already initialized with options: #{a.}" Orocos.warn "ignoring options: #{}" end return a if !reachable? || a.reachable? if [:wait] connect_attribute(a) a.wait else @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS do connect_attribute(a) end end a end |
#attributes(&block) ⇒ Object
706 707 708 709 710 711 712 713 714 715 |
# File 'lib/orocos/async/task_context_proxy.rb', line 706 def attributes(&block) p = proc do |names| names.map{|name| attribute(name)} end if block attribute_names(&p) else p.call(attribute_names) end end |
#basename ⇒ Object
553 554 555 |
# File 'lib/orocos/async/task_context_proxy.rb', line 553 def basename @name end |
#each_attribute(&block) ⇒ Object
call-seq:
task.each_attribute { |a| ... } => task
Enumerates the attributes that are available on this task, as instances of Orocos::Attribute
737 738 739 740 741 742 743 744 745 746 |
# File 'lib/orocos/async/task_context_proxy.rb', line 737 def each_attribute(&block) if !block_given? return enum_for(:each_attribute) end names = attribute_names names.each do |name| yield(attribute(name)) end end |
#each_port(&block) ⇒ Object
call-seq:
task.each_port { |p| ... } => task
Enumerates the ports that are available on this task, as instances of either Orocos::InputPort or Orocos::OutputPort
753 754 755 756 757 758 759 760 761 762 |
# File 'lib/orocos/async/task_context_proxy.rb', line 753 def each_port(&block) if !block_given? return enum_for(:each_port) end port_names.each do |name| yield(port(name)) end self end |
#each_property(&block) ⇒ Object
call-seq:
task.each_property { |a| ... } => task
Enumerates the properties that are available on this task, as instances of Orocos::Attribute
722 723 724 725 726 727 728 729 730 |
# File 'lib/orocos/async/task_context_proxy.rb', line 722 def each_property(&block) if !block_given? return enum_for(:each_property) end names = property_names names.each do |name| yield(property(name)) end end |
#name ⇒ Object
549 550 551 |
# File 'lib/orocos/async/task_context_proxy.rb', line 549 def name map_to_namespace(@name) end |
#port(name, options = Hash.new) ⇒ Object
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 |
# File 'lib/orocos/async/task_context_proxy.rb', line 633 def port(name, = Hash.new) name = name.to_str , = Kernel. ,:wait => @options[:wait] wait if [:wait] # support for subports fields = name.split(".") name = if fields.empty? name elsif name[0] == "/" # special case for log ports like: logger_name.port("/task_name.port_name") fields = [] name else fields.shift end type = if !fields.empty? .delete(:type) else nil end p = @mutex.synchronize do @ports[name] ||= PortProxy.new(self,name,) end if .has_key?(:type) && p.type? && [:type] == p.type .delete(:type) end if !.empty? && p. != Orocos.warn "Port #{p.full_name}: is already initialized with options: #{p.}" Orocos.warn "ignoring options: #{}" end if reachable? && !p.reachable? if [:wait] connect_port(p) p.wait else @event_loop.defer :known_errors => KNOWN_ERRORS do connect_port(p) end end end if fields.empty? p else p.sub_port(fields) end end |
#ports(options = Hash.new, &block) ⇒ Object
684 685 686 687 688 689 690 691 692 693 |
# File 'lib/orocos/async/task_context_proxy.rb', line 684 def ports( = Hash.new,&block) p = proc do |names| names.map{|name| port(name,)} end if block port_names(&p) else p.call(port_names) end end |
#properties(&block) ⇒ Object
695 696 697 698 699 700 701 702 703 704 |
# File 'lib/orocos/async/task_context_proxy.rb', line 695 def properties(&block) p = proc do |names| names.map{|name| property(name)} end if block property_names(&p) else p.call(property_names) end end |
#property(name, options = Hash.new) ⇒ Object
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 |
# File 'lib/orocos/async/task_context_proxy.rb', line 575 def property(name, = Hash.new) name = name.to_str , = Kernel. ,:wait => @options[:wait] wait if [:wait] p = @mutex.synchronize do @properties[name] ||= PropertyProxy.new(self,name,) end if .has_key?(:type) && p.type? && [:type] == p.type .delete(:type) end if !.empty? && p. != Orocos.warn "Property #{p.full_name}: is already initialized with options: #{p.}" Orocos.warn "ignoring options: #{}" end return p if !reachable? || p.reachable? if [:wait] connect_property(p) p.wait else @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS do connect_property(p) end end p end |
#reachable!(task_context, options = Hash.new) ⇒ Object
must be thread safe
765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 |
# File 'lib/orocos/async/task_context_proxy.rb', line 765 def reachable!(task_context, = Hash.new) raise ArgumentError, "task_context must not be instance of TaskContextProxy" if task_context.is_a?(TaskContextProxy) raise ArgumentError, "task_context must be an async instance but is #{task_context.class}" if !task_context.respond_to?(:event_names) @mutex.synchronize do @last_task_class ||= task_context.class if @last_task_class != task_context.class Vizkit.warn "Class missmatch: TaskContextProxy #{name} was recently connected to #{@last_task_class} and is now connected to #{task_context.class}." @last_task_class = task_context.class end remove_proxy_event(@delegator_obj,@delegator_obj.event_names) if valid_delegator? if @delegator_obj_old remove_proxy_event(@delegator_obj_old,@delegator_obj_old.event_names) @delegator_obj_old = nil end super(task_context,) # check if the requested ports are available @ports.values.each do |port| unless task_context.port_names.include? port.name Orocos.warn "task #{name} has currently no port called #{port.name} - on_data will be called when the port was added" end end @attributes.values.each do |attribute| unless task_context.attribute_names.include? attribute.name Orocos.warn "task #{name} has currently no attribute called #{attribute.name} - on_change will be called when the attribute was added" end end @properties.values.each do |property| unless task_context.property_names.include? property.name Orocos.warn "task #{name} has currently no property called #{property.name} - on_change will be called when the property was added" end end # this is emitting on_port_reachable, on_property_reachable .... proxy_event(@delegator_obj,@delegator_obj.event_names-[:reachable]) end end |
#reachable? ⇒ Boolean
804 805 806 807 808 809 810 811 |
# File 'lib/orocos/async/task_context_proxy.rb', line 804 def reachable? @mutex.synchronize do super && @delegator_obj.reachable? end rescue Orocos::NotFound => e unreachable! :error => e,:reconnect => @options[:reconnect] false end |
#reconnect(wait_for_task = false) ⇒ Object
asychronsosly tries to connect to the remote task
570 571 572 573 |
# File 'lib/orocos/async/task_context_proxy.rb', line 570 def reconnect(wait_for_task = false) @resolve_timer.start [:retry_period] wait if wait_for_task == true end |
#to_async(options = Hash.new) ⇒ Object
557 558 559 |
# File 'lib/orocos/async/task_context_proxy.rb', line 557 def to_async(=Hash.new) Orocos::Async.get(name,) end |
#to_proxy(options = Hash.new) ⇒ Object
561 562 563 |
# File 'lib/orocos/async/task_context_proxy.rb', line 561 def to_proxy(=Hash.new) self end |
#to_ruby ⇒ Object
565 566 567 |
# File 'lib/orocos/async/task_context_proxy.rb', line 565 def to_ruby TaskContextBase::to_ruby(self) end |
#unreachable!(options = {:reconnect => false}) ⇒ Object
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 |
# File 'lib/orocos/async/task_context_proxy.rb', line 813 def unreachable!( = {:reconnect => false}) Kernel. ,:reconnect,:error @mutex.synchronize do # do not stop proxing events here (see reachable!) # otherwise unrechable event might get lost @delegator_obj_old = if valid_delegator? @delegator_obj else @delegator_obj_old end disable_emitting do super() end end disconnect_ports disconnect_attributes disconnect_properties re = if .has_key?(:reconnect) [:reconnect] else @options[:reconnect] end reconnect if re end |