Class: Pocolog::DataStream

Inherits:
Object
  • Object
show all
Defined in:
lib/pocolog/data_stream.rb

Overview

Interface for reading a stream in a Pocolog::Logfiles

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logfile, index, name, stream_type, metadata = {}, info = StreamInfo.new) ⇒ DataStream

Returns a new instance of DataStream



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/pocolog/data_stream.rb', line 28

def initialize(logfile, index, name, stream_type,
                = {}, info = StreamInfo.new)
    @logfile = logfile
    @index = Integer(index)
    @name = name.to_str
    @metadata = 
    @info = info

    # if we do have a registry, then adapt it to the local machine
    # if needed. Right now, this is required if containers changed
    # size.
    registry = stream_type.registry
    resize_containers = Hash.new
    registry.each do |type|
        if type <= Typelib::ContainerType && type.size != type.natural_size
            resize_containers[type] = type.natural_size
        end
    end
    if resize_containers.empty?
        @type = stream_type
    else
        registry.resize(resize_containers)
        @type = registry.get(stream_type.name)
    end

    @data = nil
    @sample_index = -1
end

Instance Attribute Details

#indexObject (readonly)

Returns the value of attribute index



5
6
7
# File 'lib/pocolog/data_stream.rb', line 5

def index
  @index
end

#infoObject (readonly)

The StreamInfo structure for that stream



11
12
13
# File 'lib/pocolog/data_stream.rb', line 11

def info
  @info
end

#logfileObject (readonly)

Returns the value of attribute logfile



4
5
6
# File 'lib/pocolog/data_stream.rb', line 4

def logfile
  @logfile
end

#metadataObject

The stream associated metadata



18
19
20
# File 'lib/pocolog/data_stream.rb', line 18

def 
  @metadata
end

#nameObject (readonly)

Returns the value of attribute name



6
7
8
# File 'lib/pocolog/data_stream.rb', line 6

def name
  @name
end

#sample_indexObject (readonly)

The index in the stream of the last read sample

It is equal to size if we are past-the-end, i.e. if one called #next until it returned nil



16
17
18
# File 'lib/pocolog/data_stream.rb', line 16

def sample_index
  @sample_index
end

#time_getterObject

Returns the value of attribute time_getter



172
173
174
# File 'lib/pocolog/data_stream.rb', line 172

def time_getter
  @time_getter
end

#typeObject (readonly)

Returns the value of attribute type



8
9
10
# File 'lib/pocolog/data_stream.rb', line 8

def type
  @type
end

Instance Method Details

#[](sample_index) ⇒ Object

Returns the sample_index sample of this stream



168
169
170
# File 'lib/pocolog/data_stream.rb', line 168

def [](sample_index)
    seek(sample_index)
end

#advanceObject

Reads the next sample in the file, and returns its header. Returns nil if the end of file has been reached. Unlike next, it does not decodes the data payload.



395
396
397
398
399
400
401
402
403
404
405
# File 'lib/pocolog/data_stream.rb', line 395

def advance
    if sample_index < size - 1
        @sample_index += 1
        file_pos = stream_index.file_position_by_sample_number(@sample_index)
        logfile.read_one_block(file_pos)
        return logfile.data_header
    else
        @sample_index = size
    end
    nil
end

#closeObject



109
110
111
# File 'lib/pocolog/data_stream.rb', line 109

def close
    logfile.close
end

#closed?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/pocolog/data_stream.rb', line 101

def closed?
    logfile.closed?
end

#copy_to(start_index = 0, end_index = size, stream, &block) ⇒ Object

call-seq:

copy_to(index1,index2,stream) => true
copy_to(time1,time2,stream) => true

copies all blocks from start_index/time to end_index/time to the given stream for each block the given code block is called. If the code block returns 1 the copy process will be canceled and the method returns false

The given interval is automatically truncated if it is too big



445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/pocolog/data_stream.rb', line 445

def copy_to(start_index = 0, end_index = size, stream, &block)
    return if empty?

    interval = interval_lg
    start_index = if start_index.is_a? Time
                      if interval.first > start_index
                          0
                      else
                          stream_index.sample_number_by_time(start_index)
                      end
                  else
                      if start_index < 0
                          0
                      else
                          start_index
                      end
                  end
    end_index = if end_index.is_a? Time
                    if interval.last < end_index
                        size
                    else
                        stream_index.sample_number_by_time(end_index)
                    end
                else
                    if end_index >= size
                        size
                    else
                        end_index
                    end
                end

    counter = 0
    data_header = seek(start_index, false)
    while sample_index < end_index
        if block
            return false if block.call(counter)
        end
        data_buffer = logfile.data(data_header)
        stream.write_raw(data_header.rt_time, data_header.lg_time, data_buffer)
        counter += 1
        data_header = advance
    end
    counter
end

#data(data_header = nil) ⇒ Object



307
308
309
# File 'lib/pocolog/data_stream.rb', line 307

def data(data_header = nil)
    Typelib.to_ruby(raw_data(data_header))
end

#data_headerObject

The data header for the current sample. You can store a copy of this header to retrieve data later on with #data:

# Don't forget to duplicate !
stored_header = stream.data_header.dup
...
data = stream.data(stored_header)


232
# File 'lib/pocolog/data_stream.rb', line 232

def data_header; logfile.data_header end

#duration_lgFloat

Returns this stream's duration in seconds

Returns:

  • (Float)


216
217
218
219
220
221
222
223
# File 'lib/pocolog/data_stream.rb', line 216

def duration_lg
    if empty?
        0
    else
        interval = interval_lg
        interval[1] - interval[0]
    end
end

#each(rewind: true) {|rt, lg, sample| ... } ⇒ Object

Enumerates the samples of this stream, converting the typelib data

Parameters:

  • rewind (Boolean)

    whether the enumeration should start from the first sample, or continue wherever the stream's current position is

Yield Parameters:

  • rt (Time)

    the sample's real time

  • lg (Time)

    the sample's logical time

  • sample (Typelib::Type, Object)

    the sample, possibly converted to a Ruby value if a conversion is available

See Also:



159
160
161
162
163
164
165
# File 'lib/pocolog/data_stream.rb', line 159

def each(rewind: true)
    return enum_for(__method__, rewind: rewind) unless block_given?

    raw_each(rewind: rewind) do |rt, lg, sample|
        yield(rt, lg, Typelib.to_ruby(sample))
    end
end

#each_block(rewind = true) { ... } ⇒ Object

Enumerates the blocks of this stream

Parameters:

  • rewind (Boolean) (defaults to: true)

    whether the enumeration should start from the first sample, or continue wherever the stream's current position is

Yields:

  • after advancing to each new sample. Call #data_header to get some information about the current data block

See Also:



124
125
126
127
128
129
# File 'lib/pocolog/data_stream.rb', line 124

def each_block(rewind = true)
    return enum_for(__method__, rewind) unless block_given?

    self.rewind if rewind
    yield while advance
end

#empty?Boolean

True if the size of this stream is zero

Returns:

  • (Boolean)


240
# File 'lib/pocolog/data_stream.rb', line 240

def empty?; size == 0 end

#eof?Boolean

True if we read past the last sample

Returns:

  • (Boolean)


238
# File 'lib/pocolog/data_stream.rb', line 238

def eof?; size == sample_index end

#firstObject

call-seq:

first => [time_rt, time_lg, data]

Returns the first sample in the stream, or nil if the stream is empty

It differs from #rewind as it always decodes the data payload.

After a call to #first, #sample_index is 0



334
335
336
337
# File 'lib/pocolog/data_stream.rb', line 334

def first
    rewind
    self.next
end

#from_logical_time(time) ⇒ Object

Return a new DataStream whose all samples before the given logical time are removed



59
60
61
# File 'lib/pocolog/data_stream.rb', line 59

def from_logical_time(time)
    updated_datastream_from_index(stream_index.remove_before(time))
end

#interval_lg(Time,Time), ()

Return the logical time of the first and last samples in this stream

Returns:

  • ((Time,Time), ())

    the interval, or an empty array if the stream is empty



196
197
198
# File 'lib/pocolog/data_stream.rb', line 196

def interval_lg
    info.interval_lg.map { |t| StreamIndex.time_from_internal(t, 0) }
end

#interval_rt(Time,Time), ()

Return the realtime of the first and last samples in this stream

Returns:

  • ((Time,Time), ())

    the interval, or an empty array if the stream is empty



188
189
190
# File 'lib/pocolog/data_stream.rb', line 188

def interval_rt
    info.interval_rt.map { |t| StreamIndex.time_from_internal(t, 0) }
end

#lastObject

call-seq:

last => [time_rt, time_lg, data]

Returns the last sample in the stream, or nil if the stream is empty.

After a call to #last, #sample_index is size - 1



345
346
347
348
# File 'lib/pocolog/data_stream.rb', line 345

def last
    @sample_index = size - 2
    self.next
end

#nextObject

call-seq:

next => [time_rt, time_lg, data]

Reads the next sample in the file, and returns it. It differs from advance as it always decodes the data sample.



412
413
414
415
416
417
# File 'lib/pocolog/data_stream.rb', line 412

def next
    header = advance
    if header
        return [header.rt, header.lg, data]
    end
end

#openObject



105
106
107
# File 'lib/pocolog/data_stream.rb', line 105

def open
    logfile.open
end

#previousObject

call-seq:

previous => [time_rt, time_lg, data]

Reads the previous sample in the file, and returns it.



423
424
425
426
427
428
429
430
431
432
433
434
# File 'lib/pocolog/data_stream.rb', line 423

def previous
    if sample_index < 0
        # Just rewind, never played
        return nil
    elsif sample_index == 0
        # Beginning of file reached
        rewind
        return nil
    else
        seek(sample_index - 1)
    end
end

#raw_data(data_header = nil, sample = nil) ⇒ Object

Returns the decoded data sample associated with the given block header.

Block headers are returned by #rewind



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/pocolog/data_stream.rb', line 270

def raw_data(data_header = nil, sample = nil)
    if(@data && !data_header) then @data
    else
        data_header ||= logfile.data_header
        marshalled_data = logfile.data(data_header)
        data = sample || type.new
        data.from_buffer_direct(marshalled_data)
        if logfile.endian_swap
            data = data.endian_swap
        end
        data
    end
rescue Interrupt
    raise
rescue Exception => e
    raise e, "failed to unmarshal sample in block at position #{data_header.block_pos}: #{e.message}", e.backtrace
end

#raw_each(rewind: true) {|rt, lg, sample| ... } ⇒ Object

Enumerates the samples of this stream without converting the typelib data

Parameters:

  • rewind (Boolean)

    whether the enumeration should start from the first sample, or continue wherever the stream's current position is

Yield Parameters:

  • rt (Time)

    the sample's real time

  • lg (Time)

    the sample's logical time

  • sample (Typelib::Type)

    the sample

See Also:



140
141
142
143
144
145
146
147
# File 'lib/pocolog/data_stream.rb', line 140

def raw_each(rewind: true)
    return enum_for(__method__, rewind: rewind) unless block_given?

    each_block(rewind) do
        data_block = data_header
        yield(data_block.rt, data_block.lg, raw_data(data_block))
    end
end

#read_one_data_sample(position) ⇒ Object



288
289
290
# File 'lib/pocolog/data_stream.rb', line 288

def read_one_data_sample(position)
    Typelib.to_ruby(read_one_raw_data_sample(position))
end

#read_one_raw_data_sample(position, sample = nil) ⇒ Object



292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/pocolog/data_stream.rb', line 292

def read_one_raw_data_sample(position, sample = nil)
    block_pos = stream_index.file_position_by_sample_number(position)
    marshalled_data = logfile.read_one_data_payload(block_pos)
    data = sample || type.new
    data.from_buffer_direct(marshalled_data)
    if logfile.endian_swap
        data = data.endian_swap
    end
    data
rescue Interrupt
    raise
rescue Exception => e
    raise e, "failed to unmarshal sample in block at position #{block_pos}: #{e.message}", e.backtrace
end

#registryObject

Get the Typelib::Registry object for this stream



243
244
245
# File 'lib/pocolog/data_stream.rb', line 243

def registry
    type.registry
end

#resample_by_index(samples) ⇒ Object

Return a new DataStream with only the N-th sample



70
71
72
# File 'lib/pocolog/data_stream.rb', line 70

def resample_by_index(samples)
    updated_datastream_from_index(stream_index.resample_by_index(samples))
end

#resample_by_time(period, start_time: nil) ⇒ Object

Return a new DataStream with only the N-th sample

Parameters:

  • period (Number)

    period in seconds. Use a Rational if you need a precise period



78
79
80
81
82
# File 'lib/pocolog/data_stream.rb', line 78

def resample_by_time(period, start_time: nil)
    updated_datastream_from_index(
        stream_index.resample_by_time(period, start_time: start_time)
    )
end

#rewindObject

call-seq:

rewind => data_header

Goes to the first sample in the stream, and returns its header. Returns nil if the stream is empty.

It differs from #first as it does not decode the data payload.



318
319
320
321
322
323
324
# File 'lib/pocolog/data_stream.rb', line 318

def rewind
    # The first sample in the file has index 0, so set sample_index to
    # -1 so that (@sample_index += 1) sets the index to 0 for the first
    # sample
    @sample_index = -1
    nil
end

#samples(read_data = true) ⇒ Object

Returns a SampleEnumerator object for this stream



114
# File 'lib/pocolog/data_stream.rb', line 114

def samples(read_data = true); SampleEnumerator.new(self, read_data) end

#samples?(start_index, end_index) ⇒ Boolean

call-seq:

samples?(pos1,pos2) => true
samples?(time1,time2) => true

returns true if stream samples lies insight the given time or position interval

Returns:

  • (Boolean)


495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
# File 'lib/pocolog/data_stream.rb', line 495

def samples?(start_index,end_index)
    if end_index < start_index
        raise ArgumentError, "end bound in sample interval smaller than start bound"
    elsif start_index.is_a? Time
        if start_index > end_index
            raise ArgumentError, "end bound in sample interval smaller than start bound"
        elsif empty?
            return
        end
        start_t, end_t = interval_lg
        start_index <= end_t && start_t <= end_index
    elsif start_index < 0
        raise ArgumentError, "negative start index"
    else
        start_index < size
    end
end

#seek(pos, decode_data = true) ⇒ Object

Seek the stream at the given position

If pos is a Time object, seeks to the last sample whose logical time is not greater than pos

If pos is an integer, it is interpreted as an index and the stream goes to the sample that has this index.

Returns [rt, lg, data] for the current sample (if there is one), and nil otherwise



360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/pocolog/data_stream.rb', line 360

def seek(pos, decode_data = true)
    if pos.kind_of?(Time)
        interval_lg = self.interval_lg
        return nil if interval_lg.empty? || interval_lg[0] > pos || interval_lg[1] < pos
        @sample_index = stream_index.sample_number_by_time(pos)
    else
        @sample_index = pos
    end

    file_pos = stream_index.file_position_by_sample_number(@sample_index)
    block_info = logfile.read_one_block(file_pos)
    if block_info.stream_index != self.index
        block_stream_name =
            logfile.stream_from_index(block_info.stream_index).name
        raise InternalError,
              "index returned index=#{@sample_index} and pos=#{file_pos} as "\
              "position for seek(#{pos}) but it seems to be a sample in "\
              "stream #{block_info.stream_index} (#{block_stream_name}} while "\
              "we were expecting #{index} (#{name})"
    end

    if header = self.data_header
        header = header.dup
        if decode_data
            data = self.data(header)
            return [header.rt, header.lg, data]
        else
            header
        end
    end
end

#sizeObject

The size, in samples, of data in this stream



235
# File 'lib/pocolog/data_stream.rb', line 235

def size; info.size end

#stream_indexObject



97
98
99
# File 'lib/pocolog/data_stream.rb', line 97

def stream_index
    info.index
end

#sub_field(fieldname, data_header = nil) ⇒ Object

Returns the decoded subfield specified by 'fieldname' for the given data header. If no header is given, the current last read data header is used



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/pocolog/data_stream.rb', line 250

def sub_field(fieldname, data_header = nil)
    header = data_header || logfile.data_header
    if( header.compressed )
        data(data_header).send(fieldname)
    elsif(type.is_a?(Typelib::CompoundType) and type.has_field?(fieldname))
        offset = type.offset_of(fieldname)
        subtype = type[fieldname]
        rawData = logfile.sub_field(offset, subtype.size, data_header)
        wrappedType = subtype.wrap(rawData)
        rubyType = Typelib.to_ruby(wrappedType)
        rubyType
    else
        nil
    end
end

#timeObject

Returns the time of the current sample



175
176
177
178
179
180
181
182
# File 'lib/pocolog/data_stream.rb', line 175

def time
    header = logfile.data_header
    if !time_getter
        [header.rt, header.lg]
    else
        [header.rt, time_getter[data(header)]]
    end
end

#time_interval(rt = false) ⇒ Object

Get the logical time of first and last samples in this stream. If rt is true, returns the interval for the wall-clock time

Returns nil if the stream is empty



204
205
206
207
208
209
210
211
# File 'lib/pocolog/data_stream.rb', line 204

def time_interval(rt = false)
    Pocolog.warn_deprecated "Pocolog::DataStream#time_interval is deprecated, use #interval_lg or #interval_rt instead"
    if rt
        interval_rt
    else
        interval_lg
    end
end

#to_logical_time(time) ⇒ Object

Return a new DataStream whose all samples after the given logical time are removed



65
66
67
# File 'lib/pocolog/data_stream.rb', line 65

def to_logical_time(time)
    updated_datastream_from_index(stream_index.remove_after(time))
end

#type_nameObject



24
25
26
# File 'lib/pocolog/data_stream.rb', line 24

def type_name
    type.name
end

#typenameObject



20
21
22
# File 'lib/pocolog/data_stream.rb', line 20

def typename
    type.name
end

#updated_datastream_from_index(stream_index) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Re-creates a “copy” of this new datastream using an updated index

This allows to create a view of the original stream by modifying the index



90
91
92
93
94
95
# File 'lib/pocolog/data_stream.rb', line 90

def updated_datastream_from_index(stream_index)
    info = StreamInfo.from_raw_data(
        [], @info.interval_rt, stream_index.base_time, stream_index.index_map
    )
    self.class.new(@logfile, index, @name, @type, @metadata, info)
end

#write(rt, lg, data) ⇒ Object

Write a sample in this stream, with the rt and lg timestamps. data can be either a Typelib::Type object of the right type, or a String (in which case we consider that it is the raw data)



517
518
519
520
# File 'lib/pocolog/data_stream.rb', line 517

def write(rt, lg, data)
    data = Typelib.from_ruby(data, type)
    write_raw(rt, lg, data.to_byte_array)
end

#write_raw(rt, lg, data) ⇒ Object

Write an already marshalled sample. data is supposed to be a typelib-marshalled value of the stream type



524
525
526
# File 'lib/pocolog/data_stream.rb', line 524

def write_raw(rt, lg, data)
    logfile.write_data_block(self, rt, lg, data)
end