Class: Pocolog::DataStream
- Inherits:
-
Object
- Object
- Pocolog::DataStream
- Defined in:
- lib/pocolog/data_stream.rb
Overview
Interface for reading a stream in a Pocolog::Logfiles
Instance Attribute Summary collapse
-
#index ⇒ Object
readonly
Returns the value of attribute index.
-
#info ⇒ Object
readonly
The StreamInfo structure for that stream.
-
#logfile ⇒ Object
readonly
Returns the value of attribute logfile.
-
#metadata ⇒ Object
The stream associated metadata.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#sample_index ⇒ Object
readonly
The index in the stream of the last read sample.
-
#time_getter ⇒ Object
Returns the value of attribute time_getter.
-
#type ⇒ Object
readonly
Returns the value of attribute type.
Instance Method Summary collapse
-
#[](sample_index) ⇒ Object
Returns the
sample_index
sample of this stream. -
#advance ⇒ Object
Reads the next sample in the file, and returns its header.
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#copy_to(start_index = 0, end_index = size, stream, &block) ⇒ Object
call-seq: copy_to(index1,index2,stream) => true copy_to(time1,time2,stream) => true.
- #data(data_header = nil) ⇒ Object
-
#data_header ⇒ Object
The data header for the current sample.
-
#duration_lg ⇒ Float
Returns this stream's duration in seconds.
-
#each(rewind: true) {|rt, lg, sample| ... } ⇒ Object
Enumerates the samples of this stream, converting the typelib data.
-
#each_block(rewind = true) { ... } ⇒ Object
Enumerates the blocks of this stream.
-
#empty? ⇒ Boolean
True if the size of this stream is zero.
-
#eof? ⇒ Boolean
True if we read past the last sample.
-
#first ⇒ Object
call-seq: first => [time_rt, time_lg, data].
-
#from_logical_time(time) ⇒ Object
Return a new DataStream whose all samples before the given logical time are removed.
-
#initialize(logfile, index, name, stream_type, metadata = {}, info = StreamInfo.new) ⇒ DataStream
constructor
A new instance of DataStream.
-
#interval_lg ⇒ (Time,Time), ()
Return the logical time of the first and last samples in this stream.
-
#interval_rt ⇒ (Time,Time), ()
Return the realtime of the first and last samples in this stream.
-
#last ⇒ Object
call-seq: last => [time_rt, time_lg, data].
-
#next ⇒ Object
call-seq: next => [time_rt, time_lg, data].
- #open ⇒ Object
-
#previous ⇒ Object
call-seq: previous => [time_rt, time_lg, data].
-
#raw_data(data_header = nil, sample = nil) ⇒ Object
Returns the decoded data sample associated with the given block header.
-
#raw_each(rewind: true) {|rt, lg, sample| ... } ⇒ Object
Enumerates the samples of this stream without converting the typelib data.
- #read_one_data_sample(position) ⇒ Object
- #read_one_raw_data_sample(position, sample = nil) ⇒ Object
-
#registry ⇒ Object
Get the Typelib::Registry object for this stream.
-
#resample_by_index(samples) ⇒ Object
Return a new DataStream with only the N-th sample.
-
#resample_by_time(period, start_time: nil) ⇒ Object
Return a new DataStream with only the N-th sample.
-
#rewind ⇒ Object
call-seq: rewind => data_header.
-
#samples(read_data = true) ⇒ Object
Returns a SampleEnumerator object for this stream.
-
#samples?(start_index, end_index) ⇒ Boolean
call-seq: samples?(pos1,pos2) => true samples?(time1,time2) => true.
-
#seek(pos, decode_data = true) ⇒ Object
Seek the stream at the given position.
-
#size ⇒ Object
The size, in samples, of data in this stream.
- #stream_index ⇒ Object
-
#sub_field(fieldname, data_header = nil) ⇒ Object
Returns the decoded subfield specified by 'fieldname' for the given data header.
-
#time ⇒ Object
Returns the time of the current sample.
-
#time_interval(rt = false) ⇒ Object
Get the logical time of first and last samples in this stream.
-
#to_logical_time(time) ⇒ Object
Return a new DataStream whose all samples after the given logical time are removed.
- #type_name ⇒ Object
- #typename ⇒ Object
-
#updated_datastream_from_index(stream_index) ⇒ Object
private
Re-creates a “copy” of this new datastream using an updated index.
-
#write(rt, lg, data) ⇒ Object
Write a sample in this stream, with the
rt
andlg
timestamps. -
#write_raw(rt, lg, data) ⇒ Object
Write an already marshalled sample.
Constructor Details
#initialize(logfile, index, name, stream_type, metadata = {}, info = StreamInfo.new) ⇒ DataStream
Returns a new instance of DataStream
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/pocolog/data_stream.rb', line 28 def initialize(logfile, index, name, stream_type, = {}, info = StreamInfo.new) @logfile = logfile @index = Integer(index) @name = name.to_str @metadata = @info = info # if we do have a registry, then adapt it to the local machine # if needed. Right now, this is required if containers changed # size. registry = stream_type.registry resize_containers = Hash.new registry.each do |type| if type <= Typelib::ContainerType && type.size != type.natural_size resize_containers[type] = type.natural_size end end if resize_containers.empty? @type = stream_type else registry.resize(resize_containers) @type = registry.get(stream_type.name) end @data = nil @sample_index = -1 end |
Instance Attribute Details
#index ⇒ Object (readonly)
Returns the value of attribute index
5 6 7 |
# File 'lib/pocolog/data_stream.rb', line 5 def index @index end |
#info ⇒ Object (readonly)
The StreamInfo structure for that stream
11 12 13 |
# File 'lib/pocolog/data_stream.rb', line 11 def info @info end |
#logfile ⇒ Object (readonly)
Returns the value of attribute logfile
4 5 6 |
# File 'lib/pocolog/data_stream.rb', line 4 def logfile @logfile end |
#metadata ⇒ Object
The stream associated metadata
18 19 20 |
# File 'lib/pocolog/data_stream.rb', line 18 def @metadata end |
#name ⇒ Object (readonly)
Returns the value of attribute name
6 7 8 |
# File 'lib/pocolog/data_stream.rb', line 6 def name @name end |
#sample_index ⇒ Object (readonly)
The index in the stream of the last read sample
It is equal to size if we are past-the-end, i.e. if one called #next until it returned nil
16 17 18 |
# File 'lib/pocolog/data_stream.rb', line 16 def sample_index @sample_index end |
#time_getter ⇒ Object
Returns the value of attribute time_getter
172 173 174 |
# File 'lib/pocolog/data_stream.rb', line 172 def time_getter @time_getter end |
#type ⇒ Object (readonly)
Returns the value of attribute type
8 9 10 |
# File 'lib/pocolog/data_stream.rb', line 8 def type @type end |
Instance Method Details
#[](sample_index) ⇒ Object
Returns the sample_index
sample of this stream
168 169 170 |
# File 'lib/pocolog/data_stream.rb', line 168 def [](sample_index) seek(sample_index) end |
#advance ⇒ Object
Reads the next sample in the file, and returns its header. Returns nil if
the end of file has been reached. Unlike next
, it does not
decodes the data payload.
395 396 397 398 399 400 401 402 403 404 405 |
# File 'lib/pocolog/data_stream.rb', line 395 def advance if sample_index < size - 1 @sample_index += 1 file_pos = stream_index.file_position_by_sample_number(@sample_index) logfile.read_one_block(file_pos) return logfile.data_header else @sample_index = size end nil end |
#close ⇒ Object
109 110 111 |
# File 'lib/pocolog/data_stream.rb', line 109 def close logfile.close end |
#closed? ⇒ Boolean
101 102 103 |
# File 'lib/pocolog/data_stream.rb', line 101 def closed? logfile.closed? end |
#copy_to(start_index = 0, end_index = size, stream, &block) ⇒ Object
call-seq:
copy_to(index1,index2,stream) => true
copy_to(time1,time2,stream) => true
copies all blocks from start_index/time to end_index/time to the given stream for each block the given code block is called. If the code block returns 1 the copy process will be canceled and the method returns false
The given interval is automatically truncated if it is too big
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 |
# File 'lib/pocolog/data_stream.rb', line 445 def copy_to(start_index = 0, end_index = size, stream, &block) return if empty? interval = interval_lg start_index = if start_index.is_a? Time if interval.first > start_index 0 else stream_index.sample_number_by_time(start_index) end else if start_index < 0 0 else start_index end end end_index = if end_index.is_a? Time if interval.last < end_index size else stream_index.sample_number_by_time(end_index) end else if end_index >= size size else end_index end end counter = 0 data_header = seek(start_index, false) while sample_index < end_index if block return false if block.call(counter) end data_buffer = logfile.data(data_header) stream.write_raw(data_header.rt_time, data_header.lg_time, data_buffer) counter += 1 data_header = advance end counter end |
#data(data_header = nil) ⇒ Object
307 308 309 |
# File 'lib/pocolog/data_stream.rb', line 307 def data(data_header = nil) Typelib.to_ruby(raw_data(data_header)) end |
#data_header ⇒ Object
The data header for the current sample. You can store a copy of this header to retrieve data later on with #data:
# Don't forget to duplicate !
stored_header = stream.data_header.dup
...
data = stream.data(stored_header)
232 |
# File 'lib/pocolog/data_stream.rb', line 232 def data_header; logfile.data_header end |
#duration_lg ⇒ Float
Returns this stream's duration in seconds
216 217 218 219 220 221 222 223 |
# File 'lib/pocolog/data_stream.rb', line 216 def duration_lg if empty? 0 else interval = interval_lg interval[1] - interval[0] end end |
#each(rewind: true) {|rt, lg, sample| ... } ⇒ Object
Enumerates the samples of this stream, converting the typelib data
159 160 161 162 163 164 165 |
# File 'lib/pocolog/data_stream.rb', line 159 def each(rewind: true) return enum_for(__method__, rewind: rewind) unless block_given? raw_each(rewind: rewind) do |rt, lg, sample| yield(rt, lg, Typelib.to_ruby(sample)) end end |
#each_block(rewind = true) { ... } ⇒ Object
Enumerates the blocks of this stream
124 125 126 127 128 129 |
# File 'lib/pocolog/data_stream.rb', line 124 def each_block(rewind = true) return enum_for(__method__, rewind) unless block_given? self.rewind if rewind yield while advance end |
#empty? ⇒ Boolean
True if the size of this stream is zero
240 |
# File 'lib/pocolog/data_stream.rb', line 240 def empty?; size == 0 end |
#eof? ⇒ Boolean
True if we read past the last sample
238 |
# File 'lib/pocolog/data_stream.rb', line 238 def eof?; size == sample_index end |
#first ⇒ Object
call-seq:
first => [time_rt, time_lg, data]
Returns the first sample in the stream, or nil if the stream is empty
It differs from #rewind as it always decodes the data payload.
After a call to #first, #sample_index is 0
334 335 336 337 |
# File 'lib/pocolog/data_stream.rb', line 334 def first rewind self.next end |
#from_logical_time(time) ⇒ Object
Return a new DataStream whose all samples before the given logical time are removed
59 60 61 |
# File 'lib/pocolog/data_stream.rb', line 59 def from_logical_time(time) updated_datastream_from_index(stream_index.remove_before(time)) end |
#interval_lg ⇒ (Time,Time), ()
Return the logical time of the first and last samples in this stream
196 197 198 |
# File 'lib/pocolog/data_stream.rb', line 196 def interval_lg info.interval_lg.map { |t| StreamIndex.time_from_internal(t, 0) } end |
#interval_rt ⇒ (Time,Time), ()
Return the realtime of the first and last samples in this stream
188 189 190 |
# File 'lib/pocolog/data_stream.rb', line 188 def interval_rt info.interval_rt.map { |t| StreamIndex.time_from_internal(t, 0) } end |
#last ⇒ Object
call-seq:
last => [time_rt, time_lg, data]
Returns the last sample in the stream, or nil if the stream is empty.
After a call to #last, #sample_index is size - 1
345 346 347 348 |
# File 'lib/pocolog/data_stream.rb', line 345 def last @sample_index = size - 2 self.next end |
#next ⇒ Object
call-seq:
next => [time_rt, time_lg, data]
Reads the next sample in the file, and returns it. It differs from
advance
as it always decodes the data sample.
412 413 414 415 416 417 |
# File 'lib/pocolog/data_stream.rb', line 412 def next header = advance if header return [header.rt, header.lg, data] end end |
#open ⇒ Object
105 106 107 |
# File 'lib/pocolog/data_stream.rb', line 105 def open logfile.open end |
#previous ⇒ Object
call-seq:
previous => [time_rt, time_lg, data]
Reads the previous sample in the file, and returns it.
423 424 425 426 427 428 429 430 431 432 433 434 |
# File 'lib/pocolog/data_stream.rb', line 423 def previous if sample_index < 0 # Just rewind, never played return nil elsif sample_index == 0 # Beginning of file reached rewind return nil else seek(sample_index - 1) end end |
#raw_data(data_header = nil, sample = nil) ⇒ Object
Returns the decoded data sample associated with the given block header.
Block headers are returned by #rewind
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/pocolog/data_stream.rb', line 270 def raw_data(data_header = nil, sample = nil) if(@data && !data_header) then @data else data_header ||= logfile.data_header marshalled_data = logfile.data(data_header) data = sample || type.new data.from_buffer_direct(marshalled_data) if logfile.endian_swap data = data.endian_swap end data end rescue Interrupt raise rescue Exception => e raise e, "failed to unmarshal sample in block at position #{data_header.block_pos}: #{e.}", e.backtrace end |
#raw_each(rewind: true) {|rt, lg, sample| ... } ⇒ Object
Enumerates the samples of this stream without converting the typelib data
140 141 142 143 144 145 146 147 |
# File 'lib/pocolog/data_stream.rb', line 140 def raw_each(rewind: true) return enum_for(__method__, rewind: rewind) unless block_given? each_block(rewind) do data_block = data_header yield(data_block.rt, data_block.lg, raw_data(data_block)) end end |
#read_one_data_sample(position) ⇒ Object
288 289 290 |
# File 'lib/pocolog/data_stream.rb', line 288 def read_one_data_sample(position) Typelib.to_ruby(read_one_raw_data_sample(position)) end |
#read_one_raw_data_sample(position, sample = nil) ⇒ Object
292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/pocolog/data_stream.rb', line 292 def read_one_raw_data_sample(position, sample = nil) block_pos = stream_index.file_position_by_sample_number(position) marshalled_data = logfile.read_one_data_payload(block_pos) data = sample || type.new data.from_buffer_direct(marshalled_data) if logfile.endian_swap data = data.endian_swap end data rescue Interrupt raise rescue Exception => e raise e, "failed to unmarshal sample in block at position #{block_pos}: #{e.}", e.backtrace end |
#registry ⇒ Object
Get the Typelib::Registry object for this stream
243 244 245 |
# File 'lib/pocolog/data_stream.rb', line 243 def registry type.registry end |
#resample_by_index(samples) ⇒ Object
Return a new DataStream with only the N-th sample
70 71 72 |
# File 'lib/pocolog/data_stream.rb', line 70 def resample_by_index(samples) updated_datastream_from_index(stream_index.resample_by_index(samples)) end |
#resample_by_time(period, start_time: nil) ⇒ Object
Return a new DataStream with only the N-th sample
78 79 80 81 82 |
# File 'lib/pocolog/data_stream.rb', line 78 def resample_by_time(period, start_time: nil) updated_datastream_from_index( stream_index.resample_by_time(period, start_time: start_time) ) end |
#rewind ⇒ Object
call-seq:
rewind => data_header
Goes to the first sample in the stream, and returns its header. Returns nil if the stream is empty.
It differs from #first as it does not decode the data payload.
318 319 320 321 322 323 324 |
# File 'lib/pocolog/data_stream.rb', line 318 def rewind # The first sample in the file has index 0, so set sample_index to # -1 so that (@sample_index += 1) sets the index to 0 for the first # sample @sample_index = -1 nil end |
#samples(read_data = true) ⇒ Object
Returns a SampleEnumerator object for this stream
114 |
# File 'lib/pocolog/data_stream.rb', line 114 def samples(read_data = true); SampleEnumerator.new(self, read_data) end |
#samples?(start_index, end_index) ⇒ Boolean
call-seq:
samples?(pos1,pos2) => true
samples?(time1,time2) => true
returns true if stream samples lies insight the given time or position interval
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 |
# File 'lib/pocolog/data_stream.rb', line 495 def samples?(start_index,end_index) if end_index < start_index raise ArgumentError, "end bound in sample interval smaller than start bound" elsif start_index.is_a? Time if start_index > end_index raise ArgumentError, "end bound in sample interval smaller than start bound" elsif empty? return end start_t, end_t = interval_lg start_index <= end_t && start_t <= end_index elsif start_index < 0 raise ArgumentError, "negative start index" else start_index < size end end |
#seek(pos, decode_data = true) ⇒ Object
Seek the stream at the given position
If pos
is a Time object, seeks to the last sample whose
logical time is not greater than pos
If pos
is an integer, it is interpreted as an index and the
stream goes to the sample that has this index.
Returns [rt, lg, data] for the current sample (if there is one), and nil otherwise
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/pocolog/data_stream.rb', line 360 def seek(pos, decode_data = true) if pos.kind_of?(Time) interval_lg = self.interval_lg return nil if interval_lg.empty? || interval_lg[0] > pos || interval_lg[1] < pos @sample_index = stream_index.sample_number_by_time(pos) else @sample_index = pos end file_pos = stream_index.file_position_by_sample_number(@sample_index) block_info = logfile.read_one_block(file_pos) if block_info.stream_index != self.index block_stream_name = logfile.stream_from_index(block_info.stream_index).name raise InternalError, "index returned index=#{@sample_index} and pos=#{file_pos} as "\ "position for seek(#{pos}) but it seems to be a sample in "\ "stream #{block_info.stream_index} (#{block_stream_name}} while "\ "we were expecting #{index} (#{name})" end if header = self.data_header header = header.dup if decode_data data = self.data(header) return [header.rt, header.lg, data] else header end end end |
#size ⇒ Object
The size, in samples, of data in this stream
235 |
# File 'lib/pocolog/data_stream.rb', line 235 def size; info.size end |
#stream_index ⇒ Object
97 98 99 |
# File 'lib/pocolog/data_stream.rb', line 97 def stream_index info.index end |
#sub_field(fieldname, data_header = nil) ⇒ Object
Returns the decoded subfield specified by 'fieldname' for the given data header. If no header is given, the current last read data header is used
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/pocolog/data_stream.rb', line 250 def sub_field(fieldname, data_header = nil) header = data_header || logfile.data_header if( header.compressed ) data(data_header).send(fieldname) elsif(type.is_a?(Typelib::CompoundType) and type.has_field?(fieldname)) offset = type.offset_of(fieldname) subtype = type[fieldname] rawData = logfile.sub_field(offset, subtype.size, data_header) wrappedType = subtype.wrap(rawData) rubyType = Typelib.to_ruby(wrappedType) rubyType else nil end end |
#time ⇒ Object
Returns the time of the current sample
175 176 177 178 179 180 181 182 |
# File 'lib/pocolog/data_stream.rb', line 175 def time header = logfile.data_header if !time_getter [header.rt, header.lg] else [header.rt, time_getter[data(header)]] end end |
#time_interval(rt = false) ⇒ Object
Get the logical time of first and last samples in this stream. If
rt
is true, returns the interval for the wall-clock time
Returns nil if the stream is empty
204 205 206 207 208 209 210 211 |
# File 'lib/pocolog/data_stream.rb', line 204 def time_interval(rt = false) Pocolog.warn_deprecated "Pocolog::DataStream#time_interval is deprecated, use #interval_lg or #interval_rt instead" if rt interval_rt else interval_lg end end |
#to_logical_time(time) ⇒ Object
Return a new DataStream whose all samples after the given logical time are removed
65 66 67 |
# File 'lib/pocolog/data_stream.rb', line 65 def to_logical_time(time) updated_datastream_from_index(stream_index.remove_after(time)) end |
#type_name ⇒ Object
24 25 26 |
# File 'lib/pocolog/data_stream.rb', line 24 def type_name type.name end |
#typename ⇒ Object
20 21 22 |
# File 'lib/pocolog/data_stream.rb', line 20 def typename type.name end |
#updated_datastream_from_index(stream_index) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Re-creates a “copy” of this new datastream using an updated index
This allows to create a view of the original stream by modifying the index
90 91 92 93 94 95 |
# File 'lib/pocolog/data_stream.rb', line 90 def updated_datastream_from_index(stream_index) info = StreamInfo.from_raw_data( [], @info.interval_rt, stream_index.base_time, stream_index.index_map ) self.class.new(@logfile, index, @name, @type, @metadata, info) end |
#write(rt, lg, data) ⇒ Object
Write a sample in this stream, with the rt
and lg
timestamps. data
can be either a Typelib::Type object of the
right type, or a String (in which case we consider that it is the raw data)
517 518 519 520 |
# File 'lib/pocolog/data_stream.rb', line 517 def write(rt, lg, data) data = Typelib.from_ruby(data, type) write_raw(rt, lg, data.to_byte_array) end |
#write_raw(rt, lg, data) ⇒ Object
Write an already marshalled sample. data
is supposed to be a
typelib-marshalled value of the stream type
524 525 526 |
# File 'lib/pocolog/data_stream.rb', line 524 def write_raw(rt, lg, data) logfile.write_data_block(self, rt, lg, data) end |