Class: Orocos::Port
- Inherits:
-
Object
- Object
- Orocos::Port
- Includes:
- PortBase
- Defined in:
- lib/orocos/port.rb,
lib/orocos/ros/ports.rb,
ext/rorocos/rorocos.cc
Overview
Base class for port classes.
See OutputPort and InputPort
Direct Known Subclasses
Defined Under Namespace
Classes: InvalidMQTransportSetup
Constant Summary collapse
- DEFAULT_CONNECTION_POLICY =
{ :type => :data, :init => false, :pull => false, :data_size => 0, :size => 0, :lock => :lock_free, :transport => 0, :name_id => "" }
- CONNECTION_POLICY_OPTIONS =
DEFAULT_CONNECTION_POLICY.keys
- MQ_RTT_DEFAULT_QUEUE_LENGTH =
10
- @@transient_port_id_counter =
0
Constants included from PortBase
Orocos::PortBase::D_DIFFERENT_HOSTS, Orocos::PortBase::D_SAME_HOST, Orocos::PortBase::D_SAME_PROCESS, Orocos::PortBase::D_UNKNOWN
Class Attribute Summary collapse
-
.transport_names ⇒ Object
readonly
A mapping from a transport ID to its name in plain text.
Attributes included from PortBase
#model, #name, #orocos_type_name, #task, #type
Class Method Summary collapse
-
.prepare_policy(policy = Hash.new) ⇒ Object
fills missing policy fields with default values, checks if the generated policy is valid and returns it.
- .transient_local_port_name(base_name) ⇒ Object
-
.transport_name(id) ⇒ Object
Returns the transport name for the given transport ID or a placeholder sentence if no name is known for this transport ID.
-
.validate_policy(policy) ⇒ Object
A connection policy is represented by a hash whose elements are each of the policy parameters.
Instance Method Summary collapse
-
#connected? ⇒ Boolean
Tests if this port is already part of a connection or not.
-
#create_stream(transport, name_id, policy = Hash.new) ⇒ Object
Publishes or subscribes this port on a stream.
- #default_ros_topic_name ⇒ Object
-
#disconnect_all ⇒ Object
Removes this port from all connections it is part of.
- #do_create_stream(_policy) ⇒ Object
- #do_disconnect_all ⇒ Object
- #do_disconnect_from(other) ⇒ Object
- #do_remove_stream(stream_name) ⇒ Object
-
#doc ⇒ Object
Returns a documentation string describing the port If no documentation is available it returns nil.
-
#doc? ⇒ Boolean
Returns true if a documentation about the port is available otherwise it retuns false.
-
#handle_mq_transport(input_name, policy) ⇒ Object
Helper method for #connect_to, to handle the MQ transport (in particular, the validation of the parameters).
-
#pretty_print(pp) ⇒ Object
:nodoc:.
-
#refine_exceptions(other = nil) ⇒ Object
:nodoc:.
-
#remove_stream(name_id) ⇒ Object
Removes a stream publication.
-
#to_orocos_port ⇒ Object
Returns the Orocos port.
- #type_name ⇒ Object deprecated Deprecated.
Methods included from PortBase
#==, #distance_to, #ensure_type_available, #full_name, #initialize, #log_metadata, #max_marshalling_size, #max_sizes, #new_sample, #to_s
Class Attribute Details
.transport_names ⇒ Object (readonly)
A mapping from a transport ID to its name in plain text
35 36 37 |
# File 'lib/orocos/port.rb', line 35 def transport_names @transport_names end |
Class Method Details
.prepare_policy(policy = Hash.new) ⇒ Object
fills missing policy fields with default values, checks if the generated policy is valid and returns it
125 126 127 128 |
# File 'lib/orocos/port.rb', line 125 def self.prepare_policy(policy = Hash.new) policy = DEFAULT_CONNECTION_POLICY.merge policy Port.validate_policy(policy) end |
.transient_local_port_name(base_name) ⇒ Object
28 29 30 |
# File 'lib/orocos/port.rb', line 28 def self.transient_local_port_name(base_name) "#{base_name}.#{@@transient_port_id_counter += 1}" end |
.transport_name(id) ⇒ Object
Returns the transport name for the given transport ID or a placeholder sentence if no name is known for this transport ID
40 41 42 |
# File 'lib/orocos/port.rb', line 40 def self.transport_name(id) transport_names[id] || "unknown transport with ID #{id}" end |
.validate_policy(policy) ⇒ Object
A connection policy is represented by a hash whose elements are each of the policy parameters. Valid policies are:
-
buffer policy. Values are stored in a FIFO of the specified size. Connecting with a buffer policy is done with:
output_port.connect_to(input_port, :type => :buffer, :size => 10)
-
data policy. The input port will always read the last value pushed by the output port. It is the default policy, but can be explicitly specified with:
output_port.connect_to(input_port, :type => :data)
An additional :pull
option specifies if samples should be
pushed by the output end (i.e. if all samples that are written on the
output port are sent to the input port), or if the values are transmitted
only when the input port is read. For instance:
output_port.connect_to(input_port, :type => :data, :pull => true)
Finally, the type of locking can be specified. The lock_free
locking policy guarantees that a high-priority thread will not be “taken
over” by a low-priority one, but requires a lot of copying – so avoid it in
non-hard-realtime contexts with big data samples. The locked
locking policy uses mutexes, so is not ideal in hard realtime contexts.
Each policy is specified with:
output_port.connect_to(input_port, :lock => :lock_free)
output_port.connect_to(input_port, :lock => :locked)
This method raises ArgumentError if the policy is not valid.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/orocos/port.rb', line 106 def self.validate_policy(policy) policy = policy, CONNECTION_POLICY_OPTIONS if policy.has_key?(:type) policy[:type] = policy[:type].to_sym end if policy.has_key?(:lock) policy[:lock] = policy[:lock].to_sym end if (policy[:type] == :buffer || policy[:type] == :circular_buffer) && !policy[:size] raise ArgumentError, "you must provide a 'size' argument for buffer connections" elsif policy[:type] == :data && (policy[:size] && policy[:size] != 0) raise ArgumentError, "there are no 'size' argument to data connections" end policy end |
Instance Method Details
#connected? ⇒ Boolean
Tests if this port is already part of a connection or not
387 388 389 390 391 392 393 394 |
# File 'ext/rorocos/rorocos.cc', line 387
static VALUE port_connected_p(VALUE self)
{
RTaskContext* task; VALUE name;
tie(task, tuples::ignore, name) = getPortReference(self);
bool result = corba_blocking_fct_call_with_result(bind(&_objref_CDataFlowInterface::isConnected,(_objref_CDataFlowInterface*)task->ports,
StringValuePtr(name)));
return result ? Qtrue : Qfalse;
}
|
#create_stream(transport, name_id, policy = Hash.new) ⇒ Object
Publishes or subscribes this port on a stream
150 151 152 153 154 155 156 157 158 159 |
# File 'lib/orocos/port.rb', line 150 def create_stream(transport, name_id, policy = Hash.new) policy = Port.prepare_policy(policy) policy[:transport] = transport policy[:name_id] = name_id do_create_stream(policy) self rescue Orocos::ConnectionFailed => e raise e, "failed to create stream from #{full_name} on transport #{Port.transport_name(transport)}, name #{name_id} and policy #{policy.inspect}" end |
#default_ros_topic_name ⇒ Object
3 4 5 |
# File 'lib/orocos/ros/ports.rb', line 3 def default_ros_topic_name "#{task.name}/#{self.name}" end |
#disconnect_all ⇒ Object
Removes this port from all connections it is part of
57 58 59 60 61 |
# File 'lib/orocos/port.rb', line 57 def disconnect_all refine_exceptions do do_disconnect_all end end |
#do_create_stream(_policy) ⇒ Object
474 475 476 477 478 479 480 481 482 483 484 485 |
# File 'ext/rorocos/rorocos.cc', line 474
static VALUE do_port_create_stream(VALUE rport, VALUE _policy)
{
RTaskContext* task; VALUE name;
tie(task, tuples::ignore, name) = getPortReference(rport);
RTT::corba::CConnPolicy policy = policyFromHash(_policy);
bool result = corba_blocking_fct_call_with_result(bind(&_objref_CDataFlowInterface::createStream,(_objref_CDataFlowInterface*)task->ports,
StringValuePtr(name),policy));
if(!result)
rb_raise(eConnectionFailed, "failed to create stream");
return Qnil;
}
|
#do_disconnect_all ⇒ Object
454 455 456 457 458 459 460 461 |
# File 'ext/rorocos/rorocos.cc', line 454
static VALUE do_port_disconnect_all(VALUE port)
{
RTaskContext* task; VALUE name;
tie(task, tuples::ignore, name) = getPortReference(port);
corba_blocking_fct_call(bind(&_objref_CDataFlowInterface::disconnectPort,(_objref_CDataFlowInterface*)task->ports,
StringValuePtr(name)));
return Qnil;
}
|
#do_disconnect_from(other) ⇒ Object
463 464 465 466 467 468 469 470 471 472 |
# File 'ext/rorocos/rorocos.cc', line 463
static VALUE do_port_disconnect_from(VALUE self, VALUE other)
{
RTaskContext* self_task; VALUE self_name;
tie(self_task, tuples::ignore, self_name) = getPortReference(self);
RTaskContext* other_task; VALUE other_name;
tie(other_task, tuples::ignore, other_name) = getPortReference(other);
bool result = corba_blocking_fct_call_with_result(bind(&_objref_CDataFlowInterface::removeConnection,(_objref_CDataFlowInterface*)self_task->ports,
StringValuePtr(self_name),other_task->ports,StringValuePtr(other_name)));
return result ? Qtrue : Qfalse;
}
|
#do_remove_stream(stream_name) ⇒ Object
487 488 489 490 491 492 493 494 495 |
# File 'ext/rorocos/rorocos.cc', line 487
static VALUE do_port_remove_stream(VALUE rport, VALUE stream_name)
{
RTaskContext* task; VALUE name;
tie(task, tuples::ignore, name) = getPortReference(rport);
corba_blocking_fct_call(bind(&_objref_CDataFlowInterface::removeStream,(_objref_CDataFlowInterface*)task->ports,
StringValuePtr(name),StringValuePtr(stream_name)));
return Qnil;
}
|
#doc ⇒ Object
Returns a documentation string describing the port If no documentation is available it returns nil
138 139 140 141 142 |
# File 'lib/orocos/port.rb', line 138 def doc if model model.doc end end |
#doc? ⇒ Boolean
Returns true if a documentation about the port is available otherwise it retuns false
132 133 134 |
# File 'lib/orocos/port.rb', line 132 def doc? (doc && !doc.empty?) end |
#handle_mq_transport(input_name, policy) ⇒ Object
Helper method for #connect_to, to handle the MQ transport (in particular, the validation of the parameters)
A block must be given, that should return true if the MQ transport should be used for this particular connection and false otherwise (i.e. true if the two ports are located on the same machine, false otherwise)
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/orocos/port.rb', line 192 def handle_mq_transport(input_name, policy) # :nodoc: if policy[:transport] == TRANSPORT_MQ if !Orocos::MQueue.available? raise InvalidMQTransportSetup, "cannot select the MQueue transport as it is not built into the RTT" end # Go on to the validation steps elsif !Orocos::MQueue.available? return policy.dup elsif !Orocos::MQueue.auto? return policy.dup elsif policy[:transport] != 0 return policy.dup # explicit transport chosen, and it is not MQ end Orocos.info do "#{full_name} => #{input_name}: using MQ transport" end updated_policy = Hash[size: 0, data_size: 0]. merge(policy). merge(transport: TRANSPORT_MQ) queue_length, = updated_policy.values_at(:size, :data_size) if queue_length == 0 queue_length = MQ_RTT_DEFAULT_QUEUE_LENGTH end if Orocos::MQueue.auto_sizes? && == 0 size = max_marshalling_size if !size if policy[:transport] == TRANSPORT_MQ raise InvalidMQTransportSetup, "MQ transport explicitely selected, but the message size cannot be computed for #{self}" end if Orocos::MQueue.warn? Orocos.warn "the MQ transport could be selected, but the marshalling size of samples from the output port #{full_name}, of type #{type.name}, is unknown, falling back to auto-transport" end return policy.dup end Orocos.info do "#{full_name} => #{input_name}: MQ data_size == #{size}" end = size end if Orocos::MQueue.validate_sizes? valid = Orocos::MQueue.valid_sizes?(queue_length, ) do "#{full_name} => #{input_name} of type #{type.name}: " end if !valid if policy[:transport] == TRANSPORT_MQ raise InvalidMQTransportSetup, "MQ transport explicitely selected, but the current system setup does not allow to create a MQ of #{queue_length} messages of size #{}" end if Orocos::MQueue.warn? Orocos.warn "the MQ transport could be selected, but the marshalling size of samples (#{policy[:data_size]}) is invalid, falling back to auto-transport" end return policy.dup end end updated_policy[:data_size] = updated_policy end |
#pretty_print(pp) ⇒ Object
:nodoc:
48 49 50 51 52 53 54 |
# File 'lib/orocos/port.rb', line 48 def pretty_print(pp) # :nodoc: if type.name != orocos_type_name pp.text " #{name} (#{type.name}/#{orocos_type_name})" else pp.text " #{name} (#{type.name})" end end |
#refine_exceptions(other = nil) ⇒ Object
:nodoc:
168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/orocos/port.rb', line 168 def refine_exceptions(other = nil) # :nodoc: CORBA.refine_exceptions(self, other) do yield end rescue NotFound if !other || task.has_port?(name) raise InterfaceObjectNotFound.new(task, name), "port '#{name}' disappeared from task '#{task.name}'" else raise InterfaceObjectNotFound.new(other.task, other.name), "port '#{other.name}' disappeared from task '#{other.task.name}'" end end |
#remove_stream(name_id) ⇒ Object
Removes a stream publication. The name should be the same than the one given to the
163 164 165 166 |
# File 'lib/orocos/port.rb', line 163 def remove_stream(name_id) do_remove_stream(name_id) self end |
#to_orocos_port ⇒ Object
Returns the Orocos port
145 146 147 |
# File 'lib/orocos/port.rb', line 145 def to_orocos_port self end |
#type_name ⇒ Object
Returns the name of the typelib type. Use #type.name instead.
46 |
# File 'lib/orocos/port.rb', line 46 def type_name; type.name end |