Class: Orocos::Port

Inherits:
Object
  • Object
show all
Includes:
PortBase
Defined in:
lib/orocos/port.rb,
lib/orocos/ros/ports.rb,
ext/rorocos/rorocos.cc

Overview

Base class for port classes.

See OutputPort and InputPort

Direct Known Subclasses

InputPort, OutputPort

Defined Under Namespace

Classes: InvalidMQTransportSetup

Constant Summary collapse

DEFAULT_CONNECTION_POLICY =
{
    :type => :data,
    :init => false,
    :pull => false,
    :data_size => 0,
    :size => 0,
    :lock => :lock_free,
    :transport => 0,
    :name_id => ""
}
CONNECTION_POLICY_OPTIONS =
DEFAULT_CONNECTION_POLICY.keys
MQ_RTT_DEFAULT_QUEUE_LENGTH =
10
@@transient_port_id_counter =
0

Constants included from PortBase

Orocos::PortBase::D_DIFFERENT_HOSTS, Orocos::PortBase::D_SAME_HOST, Orocos::PortBase::D_SAME_PROCESS, Orocos::PortBase::D_UNKNOWN

Class Attribute Summary collapse

Attributes included from PortBase

#model, #name, #orocos_type_name, #task, #type

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PortBase

#==, #distance_to, #ensure_type_available, #full_name, #initialize, #log_metadata, #max_marshalling_size, #max_sizes, #new_sample, #to_s

Class Attribute Details

.transport_namesObject (readonly)

A mapping from a transport ID to its name in plain text



35
36
37
# File 'lib/orocos/port.rb', line 35

def transport_names
  @transport_names
end

Class Method Details

.prepare_policy(policy = Hash.new) ⇒ Object

fills missing policy fields with default values, checks if the generated policy is valid and returns it



125
126
127
128
# File 'lib/orocos/port.rb', line 125

def self.prepare_policy(policy = Hash.new)
    policy = DEFAULT_CONNECTION_POLICY.merge policy
    Port.validate_policy(policy)
end

.transient_local_port_name(base_name) ⇒ Object



28
29
30
# File 'lib/orocos/port.rb', line 28

def self.transient_local_port_name(base_name)
    "#{base_name}.#{@@transient_port_id_counter += 1}"
end

.transport_name(id) ⇒ Object

Returns the transport name for the given transport ID or a placeholder sentence if no name is known for this transport ID



40
41
42
# File 'lib/orocos/port.rb', line 40

def self.transport_name(id)
    transport_names[id] || "unknown transport with ID #{id}"
end

.validate_policy(policy) ⇒ Object

A connection policy is represented by a hash whose elements are each of the policy parameters. Valid policies are:

  • buffer policy. Values are stored in a FIFO of the specified size. Connecting with a buffer policy is done with:

    output_port.connect_to(input_port, :type => :buffer, :size => 10)
    
  • data policy. The input port will always read the last value pushed by the output port. It is the default policy, but can be explicitly specified with:

    output_port.connect_to(input_port, :type => :data)
    

An additional :pull option specifies if samples should be pushed by the output end (i.e. if all samples that are written on the output port are sent to the input port), or if the values are transmitted only when the input port is read. For instance:

output_port.connect_to(input_port, :type => :data, :pull => true)

Finally, the type of locking can be specified. The lock_free locking policy guarantees that a high-priority thread will not be “taken over” by a low-priority one, but requires a lot of copying – so avoid it in non-hard-realtime contexts with big data samples. The locked locking policy uses mutexes, so is not ideal in hard realtime contexts. Each policy is specified with:

output_port.connect_to(input_port, :lock => :lock_free)
output_port.connect_to(input_port, :lock => :locked)

This method raises ArgumentError if the policy is not valid.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/orocos/port.rb', line 106

def self.validate_policy(policy)
    policy = validate_options policy, CONNECTION_POLICY_OPTIONS
    if policy.has_key?(:type)
        policy[:type] = policy[:type].to_sym
    end
    if policy.has_key?(:lock)
        policy[:lock] = policy[:lock].to_sym
    end

    if (policy[:type] == :buffer || policy[:type] == :circular_buffer) && !policy[:size]
        raise ArgumentError, "you must provide a 'size' argument for buffer connections"
    elsif policy[:type] == :data && (policy[:size] && policy[:size] != 0)
        raise ArgumentError, "there are no 'size' argument to data connections"
    end
    policy
end

Instance Method Details

#connected?Boolean

Tests if this port is already part of a connection or not

Returns:

  • (Boolean)


387
388
389
390
391
392
393
394
# File 'ext/rorocos/rorocos.cc', line 387

static VALUE port_connected_p(VALUE self)
{
    RTaskContext* task; VALUE name;
    tie(task, tuples::ignore, name) = getPortReference(self);
    bool result = corba_blocking_fct_call_with_result(bind(&_objref_CDataFlowInterface::isConnected,(_objref_CDataFlowInterface*)task->ports,
                                                      StringValuePtr(name)));
    return result ? Qtrue : Qfalse;
}

#create_stream(transport, name_id, policy = Hash.new) ⇒ Object

Publishes or subscribes this port on a stream



150
151
152
153
154
155
156
157
158
159
# File 'lib/orocos/port.rb', line 150

def create_stream(transport, name_id, policy = Hash.new)
    policy = Port.prepare_policy(policy)
    policy[:transport] = transport
    policy[:name_id] = name_id
    do_create_stream(policy)
            
    self
rescue Orocos::ConnectionFailed => e
    raise e, "failed to create stream from #{full_name} on transport #{Port.transport_name(transport)}, name #{name_id} and policy #{policy.inspect}"
end

#default_ros_topic_nameObject



3
4
5
# File 'lib/orocos/ros/ports.rb', line 3

def default_ros_topic_name
    "#{task.name}/#{self.name}"
end

#disconnect_allObject

Removes this port from all connections it is part of



57
58
59
60
61
# File 'lib/orocos/port.rb', line 57

def disconnect_all
    refine_exceptions do
        do_disconnect_all
    end
end

#do_create_stream(_policy) ⇒ Object



474
475
476
477
478
479
480
481
482
483
484
485
# File 'ext/rorocos/rorocos.cc', line 474

static VALUE do_port_create_stream(VALUE rport, VALUE _policy)
{
    RTaskContext* task; VALUE name;
    tie(task, tuples::ignore, name) = getPortReference(rport);

    RTT::corba::CConnPolicy policy = policyFromHash(_policy);
    bool result = corba_blocking_fct_call_with_result(bind(&_objref_CDataFlowInterface::createStream,(_objref_CDataFlowInterface*)task->ports,
                StringValuePtr(name),policy));
    if(!result)
        rb_raise(eConnectionFailed, "failed to create stream");
    return Qnil;
}

#do_disconnect_allObject



454
455
456
457
458
459
460
461
# File 'ext/rorocos/rorocos.cc', line 454

static VALUE do_port_disconnect_all(VALUE port)
{
    RTaskContext* task; VALUE name;
    tie(task, tuples::ignore, name) = getPortReference(port);
    corba_blocking_fct_call(bind(&_objref_CDataFlowInterface::disconnectPort,(_objref_CDataFlowInterface*)task->ports,
                            StringValuePtr(name)));
    return Qnil;
}

#do_disconnect_from(other) ⇒ Object



463
464
465
466
467
468
469
470
471
472
# File 'ext/rorocos/rorocos.cc', line 463

static VALUE do_port_disconnect_from(VALUE self, VALUE other)
{
    RTaskContext* self_task; VALUE self_name;
    tie(self_task, tuples::ignore, self_name) = getPortReference(self);
    RTaskContext* other_task; VALUE other_name;
    tie(other_task, tuples::ignore, other_name) = getPortReference(other);
    bool result = corba_blocking_fct_call_with_result(bind(&_objref_CDataFlowInterface::removeConnection,(_objref_CDataFlowInterface*)self_task->ports,
                StringValuePtr(self_name),other_task->ports,StringValuePtr(other_name)));
    return result ? Qtrue : Qfalse;
}

#do_remove_stream(stream_name) ⇒ Object



487
488
489
490
491
492
493
494
495
# File 'ext/rorocos/rorocos.cc', line 487

static VALUE do_port_remove_stream(VALUE rport, VALUE stream_name)
{
    RTaskContext* task; VALUE name;
    tie(task, tuples::ignore, name) = getPortReference(rport);

    corba_blocking_fct_call(bind(&_objref_CDataFlowInterface::removeStream,(_objref_CDataFlowInterface*)task->ports,
                            StringValuePtr(name),StringValuePtr(stream_name)));
    return Qnil;
}

#docObject

Returns a documentation string describing the port If no documentation is available it returns nil



138
139
140
141
142
# File 'lib/orocos/port.rb', line 138

def doc
    if model
        model.doc
    end
end

#doc?Boolean

Returns true if a documentation about the port is available otherwise it retuns false

Returns:

  • (Boolean)


132
133
134
# File 'lib/orocos/port.rb', line 132

def doc?
    (doc && !doc.empty?)
end

#handle_mq_transport(input_name, policy) ⇒ Object

Helper method for #connect_to, to handle the MQ transport (in particular, the validation of the parameters)

A block must be given, that should return true if the MQ transport should be used for this particular connection and false otherwise (i.e. true if the two ports are located on the same machine, false otherwise)



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/orocos/port.rb', line 192

def handle_mq_transport(input_name, policy) # :nodoc:
    if policy[:transport] == TRANSPORT_MQ
        if !Orocos::MQueue.available?
            raise InvalidMQTransportSetup, "cannot select the MQueue transport as it is not built into the RTT"
        end
        # Go on to the validation steps
    elsif !Orocos::MQueue.available?
        return policy.dup
    elsif !Orocos::MQueue.auto?
        return policy.dup
    elsif policy[:transport] != 0
        return policy.dup # explicit transport chosen, and it is not MQ
    end

    Orocos.info do
        "#{full_name} => #{input_name}: using MQ transport"
    end
    updated_policy = Hash[size: 0, data_size: 0].
        merge(policy).
        merge(transport: TRANSPORT_MQ)

    queue_length, message_size = updated_policy.values_at(:size, :data_size)
    if queue_length == 0
        queue_length = MQ_RTT_DEFAULT_QUEUE_LENGTH
    end

    if Orocos::MQueue.auto_sizes? && message_size == 0
        size = max_marshalling_size
        if !size
            if policy[:transport] == TRANSPORT_MQ
                raise InvalidMQTransportSetup, "MQ transport explicitely selected, but the message size cannot be computed for #{self}"
            end

            if Orocos::MQueue.warn?
                Orocos.warn "the MQ transport could be selected, but the marshalling size of samples from the output port #{full_name}, of type #{type.name}, is unknown, falling back to auto-transport"
            end
            return policy.dup
        end

        Orocos.info do
            "#{full_name} => #{input_name}: MQ data_size == #{size}"
        end
        message_size = size
    end

    if Orocos::MQueue.validate_sizes?
        valid = Orocos::MQueue.valid_sizes?(queue_length, message_size) do
            "#{full_name} => #{input_name} of type #{type.name}: "
        end

        if !valid
            if policy[:transport] == TRANSPORT_MQ
                raise InvalidMQTransportSetup, "MQ transport explicitely selected, but the current system setup does not allow to create a MQ of #{queue_length} messages of size #{message_size}"
            end

            if Orocos::MQueue.warn?
                Orocos.warn "the MQ transport could be selected, but the marshalling size of samples (#{policy[:data_size]}) is invalid, falling back to auto-transport"
            end
            return policy.dup
        end
    end

    updated_policy[:data_size] = message_size
    updated_policy
end

#pretty_print(pp) ⇒ Object

:nodoc:



48
49
50
51
52
53
54
# File 'lib/orocos/port.rb', line 48

def pretty_print(pp) # :nodoc:
    if type.name != orocos_type_name
        pp.text " #{name} (#{type.name}/#{orocos_type_name})"
    else
        pp.text " #{name} (#{type.name})"
    end
end

#refine_exceptions(other = nil) ⇒ Object

:nodoc:



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/orocos/port.rb', line 168

def refine_exceptions(other = nil) # :nodoc:
    CORBA.refine_exceptions(self, other) do
        yield
    end

rescue NotFound
    if !other || task.has_port?(name)
        raise InterfaceObjectNotFound.new(task, name), "port '#{name}' disappeared from task '#{task.name}'"
    else
        raise InterfaceObjectNotFound.new(other.task, other.name), "port '#{other.name}' disappeared from task '#{other.task.name}'"
    end
end

#remove_stream(name_id) ⇒ Object

Removes a stream publication. The name should be the same than the one given to the



163
164
165
166
# File 'lib/orocos/port.rb', line 163

def remove_stream(name_id)
    do_remove_stream(name_id)
    self
end

#to_orocos_portObject

Returns the Orocos port



145
146
147
# File 'lib/orocos/port.rb', line 145

def to_orocos_port
    self
end

#type_nameObject

Deprecated.

Returns the name of the typelib type. Use #type.name instead.



46
# File 'lib/orocos/port.rb', line 46

def type_name; type.name end