Class: Pocolog::BlockStream

Inherits:
Object
  • Object
show all
Defined in:
lib/pocolog/block_stream.rb

Overview

Enumeration of blocks in a pocolog-compatible stream

Defined Under Namespace

Classes: BlockHeader, DataBlockHeader, StreamBlock

Constant Summary

FORMAT_VERSION =
Format::Current::VERSION
BLOCK_HEADER_SIZE =

The size of the generic block header

Format::Current::BLOCK_HEADER_SIZE
TIME_SIZE =

The size of a time in a block header

Format::Current::TIME_SIZE
MAGIC =

Magic code at the beginning of the log file

Format::Current::MAGIC
DEFAULT_BUFFER_READ =

Read by 4kB chunks by default

4 * 1024

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, buffer_read: DEFAULT_BUFFER_READ) ⇒ BlockStream

Create a Pocolog::BlockStream object to sequentially interpret a stream of data



42
43
44
45
46
47
48
49
# File 'lib/pocolog/block_stream.rb', line 42

def initialize(io, buffer_read: DEFAULT_BUFFER_READ)
    @io = io
    @big_endian = nil
    @native_endian = nil
    @payload_size = 0
    @buffer_io = StringIO.new
    @buffer_read = buffer_read
end

Instance Attribute Details

#buffer_readObject (readonly)

The amount of bytes that should be read into the internal buffer



23
24
25
# File 'lib/pocolog/block_stream.rb', line 23

def buffer_read
  @buffer_read
end

#ioObject (readonly)

The underlying IO



20
21
22
# File 'lib/pocolog/block_stream.rb', line 20

def io
  @io
end

Class Method Details

.open(path) ⇒ Object

Create a BlockStream object that acts on a given file



31
32
33
34
35
36
37
38
39
# File 'lib/pocolog/block_stream.rb', line 31

def self.open(path)
    if block_given?
        File.open(path) do |io|
            yield(new(io))
        end
    else
        new(File.open(path))
    end
end

.read_block_header(io, pos = nil) ⇒ Object



150
151
152
153
# File 'lib/pocolog/block_stream.rb', line 150

def self.read_block_header(io, pos = nil)
    io.seek(pos, IO::SEEK_SET) if pos
    BlockHeader.parse(io.read(BLOCK_HEADER_SIZE))
end

.read_stream_block(io, pos = nil) ⇒ Object



253
254
255
256
257
258
259
260
# File 'lib/pocolog/block_stream.rb', line 253

def self.read_stream_block(io, pos = nil)
    block = read_block(io, pos)
    if block.kind != STREAM_BLOCK
        raise InvalidFile, 'expected stream declaration block'
    end

    StreamBlock.parse(io.read(block.payload_size))
end

Instance Method Details

#big_endian?Boolean

Whether the data in the file is stored in little or big endian

Returns:

  • (Boolean)


26
27
28
# File 'lib/pocolog/block_stream.rb', line 26

def big_endian?
    @big_endian
end

#closeObject

Close the file

See Also:



90
91
92
# File 'lib/pocolog/block_stream.rb', line 90

def close
    io.close
end

#closed?Boolean

Whether this stream is closed

Returns:

  • (Boolean)

See Also:



78
79
80
# File 'lib/pocolog/block_stream.rb', line 78

def closed?
    io.closed?
end

#flushObject

Flush buffers to the underlying backing store



83
84
85
# File 'lib/pocolog/block_stream.rb', line 83

def flush
    io.flush
end

#pathString

The IO path, if the backing IO is a file

Returns:

  • (String)


71
72
73
# File 'lib/pocolog/block_stream.rb', line 71

def path
    io.path
end

#read(size) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Read bytes



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/pocolog/block_stream.rb', line 111

def read(size)
    remaining =
        if (data = @buffer_io.read(size))
            (size - data.size)
        else
            size
        end

    return data if remaining == 0

    @buffer_io = StringIO.new(io.read([buffer_read, remaining].max) || '')
    if (buffer_data = @buffer_io.read(remaining))
        (data || '') + buffer_data
    else
        data
    end
end

#read_data_block(uncompress: true) ⇒ Object

Read the marshalled version of a data block

It splits the block into its header and payload part, and optionally uncompresses the data sample



348
349
350
351
352
353
354
355
356
357
# File 'lib/pocolog/block_stream.rb', line 348

def read_data_block(uncompress: true)
    raw_header = read_payload(Format::Current::DATA_BLOCK_HEADER_SIZE)
    raw_data   = read_payload
    compressed = raw_header[-1, 1].unpack('C').first
    if uncompress && (compressed != 0)
        # Payload is compressed
        raw_data = Zlib::Inflate.inflate(raw_data)
    end
    [raw_header, raw_data]
end

#read_data_block_headerObject

Read the header of one data block

The IO is assumed to be positioned at the beginning of the block's payload



338
339
340
341
342
# File 'lib/pocolog/block_stream.rb', line 338

def read_data_block_header
    DataBlockHeader.parse(
        read_payload(Format::Current::DATA_BLOCK_HEADER_SIZE)
    )
end

#read_data_block_payloadObject

Read the data payload of a data block, not parsing the header

The IO is assumed to be positioned just after the block header (i.e. after read_next_block_header)



363
364
365
366
367
368
369
370
371
372
# File 'lib/pocolog/block_stream.rb', line 363

def read_data_block_payload
    skip(Format::Current::DATA_BLOCK_HEADER_SIZE - 1)
    compressed = read_payload(1).unpack('C').first
    data = read_payload
    if compressed != 0
        # Payload is compressed
        data = Zlib::Inflate.inflate(data)
    end
    data
end

#read_next_block_headerObject

Read the header of the next block



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/pocolog/block_stream.rb', line 156

def read_next_block_header
    skip(@payload_size) if @payload_size != 0

    header = read(BLOCK_HEADER_SIZE)
    return unless header

    if header.size != BLOCK_HEADER_SIZE
        raise NotEnoughData,
              "truncated block header (got #{header.size} bytes, "\
              "expected #{BLOCK_HEADER_SIZE})"
    end

    block = BlockHeader.parse(header)
    @payload_size = block.payload_size
    block
end

#read_payload(count = @payload_size) ⇒ Object

Read the payload of the last block returned by #read_next_block_header



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/pocolog/block_stream.rb', line 271

def read_payload(count = @payload_size)
    if count > @payload_size
        raise ArgumentError,
              "expected read count #{count} greater than remaining "\
              "payload size #{@payload_size}"
    end

    result = read(count)
    if !result || result.size != count
        raise NotEnoughData,
              "expected to read #{count} bytes but got "\
              "#{result ? result.size : 'EOF'}"
    end

    @payload_size -= count
    result
end

#read_prologueObject

If the IO is a file, it starts with a prologue to describe the file format

Raises MissingPrologue if no prologue is found, or ObsoleteVersion if the file format is not up-to-date (in which case one has to run pocolog –to-new-format).



135
136
137
138
139
140
141
# File 'lib/pocolog/block_stream.rb', line 135

def read_prologue # :nodoc:
    big_endian = Format::Current.read_prologue(io)
    @format_version = Format::Current::VERSION
    @big_endian = big_endian
    @native_endian = ((big_endian != 0) ^ Pocolog.big_endian?)
    @payload_size = 0
end

#read_stream_blockObject

Read one stream block

The IO is assumed to be positioned at the stream definition's block's payload



265
266
267
# File 'lib/pocolog/block_stream.rb', line 265

def read_stream_block
    StreamBlock.parse(read_payload)
end

#rewindObject

Move to the beginning of the stream



95
96
97
# File 'lib/pocolog/block_stream.rb', line 95

def rewind
    seek(0)
end

#seek(pos) ⇒ Object

Seek to the current raw position in the IO

The new position is assumed to be at the start of a block



102
103
104
105
106
# File 'lib/pocolog/block_stream.rb', line 102

def seek(pos)
    io.seek(pos)
    @buffer_io = StringIO.new
    @payload_size = 0
end

#skip(count) ⇒ Object

Skip that many bytes in the stream



57
58
59
60
61
62
63
64
65
66
# File 'lib/pocolog/block_stream.rb', line 57

def skip(count)
    buffer_remaining = (@buffer_io.size - @buffer_io.tell)
    if buffer_remaining < count
        @buffer_io.seek(buffer_remaining, IO::SEEK_CUR)
        io.seek(count - buffer_remaining, IO::SEEK_CUR)
    else
        @buffer_io.seek(count, IO::SEEK_CUR)
    end
    @payload_size -= count
end

#skip_payloadObject



289
290
291
# File 'lib/pocolog/block_stream.rb', line 289

def skip_payload
    skip(@payload_size)
end

#tellObject

Current position in #io



52
53
54
# File 'lib/pocolog/block_stream.rb', line 52

def tell
    io.tell - (@buffer_io.size - @buffer_io.tell)
end