Module: Orocos::Test::Component
- Includes:
- RubyTasks
- Defined in:
- lib/orocos/test/component.rb
Defined Under Namespace
Modules: ClassExtension
Instance Method Summary collapse
-
#assert_has_no_new_sample(reader, timeout = 0.2) ⇒ Object
Verify that no sample arrives on
reader
withintimeout
seconds. -
#assert_has_one_new_sample(reader, timeout = 3, poll_period = 0.01) ⇒ Object
Verifies that
reader
gets one sample withintimeout
seconds. -
#assert_state_change(task, timeout = 1) ⇒ Object
call-seq: assert_state_change(task, timeout = 1) { |state| test_if_state_is_the_expected_state } Tests if the state of
task
changes to an expected value. -
#data_reader(port, policy = Hash.new) ⇒ Object
Gets the data reader for this port.
-
#data_writer(port, policy = Hash.new) ⇒ Object
Gets the data writer for this port.
- #setup ⇒ Object
-
#start(*args) ⇒ Object
call-seq: start 'model_name', 'task_name' start 'deployment_name', 'task_name'[, 'prefix'].
- #teardown ⇒ Object
Methods included from RubyTasks
#helpers_dir, #new_external_ruby_task_context, #new_ruby_task_context, #register_allocated_ruby_tasks
Instance Method Details
#assert_has_no_new_sample(reader, timeout = 0.2) ⇒ Object
Verify that no sample arrives on reader
within
timeout
seconds
45 46 47 48 |
# File 'lib/orocos/test/component.rb', line 45 def assert_has_no_new_sample(reader, timeout = 0.2) sleep timeout assert(!reader.read_new, "#{reader} has one new sample, but none was expected") end |
#assert_has_one_new_sample(reader, timeout = 3, poll_period = 0.01) ⇒ Object
Verifies that reader
gets one sample within
timeout
seconds
51 52 53 54 55 56 57 58 59 |
# File 'lib/orocos/test/component.rb', line 51 def assert_has_one_new_sample(reader, timeout = 3, poll_period = 0.01) Integer(timeout / poll_period).times do if sample = reader.read_new return sample end sleep poll_period end flunk("expected to get one new sample out of #{reader.port.name}, but got none") end |
#assert_state_change(task, timeout = 1) ⇒ Object
call-seq:
assert_state_change(task, timeout = 1) { |state| test_if_state_is_the_expected_state }
Tests if the state of task
changes to an expected value. The
block should return whether the passed state is the expected state or not.
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/orocos/test/component.rb', line 67 def assert_state_change(task, timeout = 1) sleep_time = Float(timeout) / 10 10.times do queued_state_changes = task.peek_state if queued_state_changes.any? { |s| yield(s) } return end sleep sleep_time end flunk("could not find the expected state change for #{task.name} in #{task.peek_state.inspect}") end |
#data_reader(port, policy = Hash.new) ⇒ Object
Gets the data reader for this port. It gets disconnected on teardown
121 122 123 124 125 |
# File 'lib/orocos/test/component.rb', line 121 def data_reader(port, policy = Hash.new) reader = port.reader(policy) data_readers << reader reader end |
#data_writer(port, policy = Hash.new) ⇒ Object
Gets the data writer for this port. It gets disconnected on teardown
129 130 131 132 133 |
# File 'lib/orocos/test/component.rb', line 129 def data_writer(port, policy = Hash.new) writer = port.writer(policy) data_writers << writer writer end |
#setup ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/orocos/test/component.rb', line 13 def setup if !Orocos.initialized? Orocos.initialize end self.class.run_specs.each do |name, task_name, run_spec| start(run_spec) instance_variable_set("@#{name}", Orocos.name_service.get(task_name)) end self.class.reader_specs.each do |task_name, port_name, reader_name, policy| reader = self.data_reader(send(task_name).port(port_name), policy) instance_variable_set("@#{reader_name}", reader) end self.class.writer_specs.each do |task_name, port_name, writer_name, policy| writer = self.data_writer(send(task_name).port(port_name), policy) instance_variable_set("@#{writer_name}", writer) end super if defined? super end |
#start(*args) ⇒ Object
call-seq:
start 'model_name', 'task_name'
start 'deployment_name', 'task_name'[, 'prefix']
Requires the unit test to start a deployment/task at the point of the call, and make sure to shut it down during teardown. In test methods, the task object is made accessible with the 'attribute_name' attribute
In the first form, the task is given through its model. The global task name is registered with 'task_name', which defaults to 'attribute_name'
In the second form, the task is given through a deployment name / task name pair. If a prefix is given, task_name must include the prefix as well, i.e.:
start 'task', 'rock_logger', 'source_logger', 'source'
where 'logger' is a task of the 'rock_logger' deployment.
For instance:
describe 'xsens_imu::Task' do
include Orocos::Test::Component
it "should fail to configure if no device is present" do
start 'xsens_imu::Task' => 'task'
task = Orocos.name_service.get 'task'
task.device = ""
assert_raises(Orocos::StateTransitionFailed) { task.configure }
end
end
114 115 116 117 |
# File 'lib/orocos/test/component.rb', line 114 def start(*args) processes.concat(Orocos.run(*args)) nil end |
#teardown ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/orocos/test/component.rb', line 34 def teardown data_readers.each { |r| r.disconnect } data_readers.clear data_writers.each { |w| w.disconnect } data_writers.clear processes.each { |p| p.kill } processes.clear super if defined? super end |