Class: Pocolog::BlockStream
- Inherits:
-
Object
- Object
- Pocolog::BlockStream
- Defined in:
- lib/pocolog/block_stream.rb
Overview
Enumeration of blocks in a pocolog-compatible stream
Defined Under Namespace
Classes: BlockHeader, DataBlockHeader, StreamBlock
Constant Summary
- FORMAT_VERSION =
Format::Current::VERSION
- BLOCK_HEADER_SIZE =
The size of the generic block header
Format::Current::BLOCK_HEADER_SIZE
- TIME_SIZE =
The size of a time in a block header
Format::Current::TIME_SIZE
- MAGIC =
Magic code at the beginning of the log file
Format::Current::MAGIC
- DEFAULT_BUFFER_READ =
Read by 4kB chunks by default
4 * 1024
Instance Attribute Summary collapse
-
#buffer_read ⇒ Object
readonly
The amount of bytes that should be read into the internal buffer.
-
#io ⇒ Object
readonly
The underlying IO.
Class Method Summary collapse
-
.open(path) ⇒ Object
Create a BlockStream object that acts on a given file.
- .read_block_header(io, pos = nil) ⇒ Object
- .read_stream_block(io, pos = nil) ⇒ Object
Instance Method Summary collapse
-
#big_endian? ⇒ Boolean
Whether the data in the file is stored in little or big endian.
-
#close ⇒ Object
Close the file.
-
#closed? ⇒ Boolean
Whether this stream is closed.
-
#flush ⇒ Object
Flush buffers to the underlying backing store.
-
#initialize(io, buffer_read: DEFAULT_BUFFER_READ) ⇒ BlockStream
constructor
Create a BlockStream object to sequentially interpret a stream of data.
-
#path ⇒ String
The IO path, if the backing IO is a file.
-
#read(size) ⇒ Object
private
Read bytes.
-
#read_data_block(uncompress: true) ⇒ Object
Read the marshalled version of a data block.
-
#read_data_block_header ⇒ Object
Read the header of one data block.
-
#read_data_block_payload ⇒ Object
Read the data payload of a data block, not parsing the header.
-
#read_next_block_header ⇒ Object
Read the header of the next block.
-
#read_payload(count = @payload_size) ⇒ Object
Read the payload of the last block returned by #read_next_block_header.
-
#read_prologue ⇒ Object
If the IO is a file, it starts with a prologue to describe the file format.
-
#read_stream_block ⇒ Object
Read one stream block.
-
#rewind ⇒ Object
Move to the beginning of the stream.
-
#seek(pos) ⇒ Object
Seek to the current raw position in the IO.
-
#skip(count) ⇒ Object
Skip that many bytes in the stream.
- #skip_payload ⇒ Object
-
#tell ⇒ Object
Current position in #io.
Constructor Details
#initialize(io, buffer_read: DEFAULT_BUFFER_READ) ⇒ BlockStream
Create a Pocolog::BlockStream object to sequentially interpret a stream of data
42 43 44 45 46 47 48 49 |
# File 'lib/pocolog/block_stream.rb', line 42 def initialize(io, buffer_read: DEFAULT_BUFFER_READ) @io = io @big_endian = nil @native_endian = nil @payload_size = 0 @buffer_io = StringIO.new @buffer_read = buffer_read end |
Instance Attribute Details
#buffer_read ⇒ Object (readonly)
The amount of bytes that should be read into the internal buffer
23 24 25 |
# File 'lib/pocolog/block_stream.rb', line 23 def buffer_read @buffer_read end |
#io ⇒ Object (readonly)
The underlying IO
20 21 22 |
# File 'lib/pocolog/block_stream.rb', line 20 def io @io end |
Class Method Details
.open(path) ⇒ Object
Create a BlockStream object that acts on a given file
31 32 33 34 35 36 37 38 39 |
# File 'lib/pocolog/block_stream.rb', line 31 def self.open(path) if block_given? File.open(path) do |io| yield(new(io)) end else new(File.open(path)) end end |
.read_block_header(io, pos = nil) ⇒ Object
150 151 152 153 |
# File 'lib/pocolog/block_stream.rb', line 150 def self.read_block_header(io, pos = nil) io.seek(pos, IO::SEEK_SET) if pos BlockHeader.parse(io.read(BLOCK_HEADER_SIZE)) end |
.read_stream_block(io, pos = nil) ⇒ Object
253 254 255 256 257 258 259 260 |
# File 'lib/pocolog/block_stream.rb', line 253 def self.read_stream_block(io, pos = nil) block = read_block(io, pos) if block.kind != STREAM_BLOCK raise InvalidFile, 'expected stream declaration block' end StreamBlock.parse(io.read(block.payload_size)) end |
Instance Method Details
#big_endian? ⇒ Boolean
Whether the data in the file is stored in little or big endian
26 27 28 |
# File 'lib/pocolog/block_stream.rb', line 26 def big_endian? @big_endian end |
#close ⇒ Object
Close the file
90 91 92 |
# File 'lib/pocolog/block_stream.rb', line 90 def close io.close end |
#closed? ⇒ Boolean
Whether this stream is closed
78 79 80 |
# File 'lib/pocolog/block_stream.rb', line 78 def closed? io.closed? end |
#flush ⇒ Object
Flush buffers to the underlying backing store
83 84 85 |
# File 'lib/pocolog/block_stream.rb', line 83 def flush io.flush end |
#path ⇒ String
The IO path, if the backing IO is a file
71 72 73 |
# File 'lib/pocolog/block_stream.rb', line 71 def path io.path end |
#read(size) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Read bytes
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/pocolog/block_stream.rb', line 111 def read(size) remaining = if (data = @buffer_io.read(size)) (size - data.size) else size end return data if remaining == 0 @buffer_io = StringIO.new(io.read([buffer_read, remaining].max) || '') if (buffer_data = @buffer_io.read(remaining)) (data || '') + buffer_data else data end end |
#read_data_block(uncompress: true) ⇒ Object
Read the marshalled version of a data block
It splits the block into its header and payload part, and optionally uncompresses the data sample
348 349 350 351 352 353 354 355 356 357 |
# File 'lib/pocolog/block_stream.rb', line 348 def read_data_block(uncompress: true) raw_header = read_payload(Format::Current::DATA_BLOCK_HEADER_SIZE) raw_data = read_payload compressed = raw_header[-1, 1].unpack('C').first if uncompress && (compressed != 0) # Payload is compressed raw_data = Zlib::Inflate.inflate(raw_data) end [raw_header, raw_data] end |
#read_data_block_header ⇒ Object
Read the header of one data block
The IO is assumed to be positioned at the beginning of the block's payload
338 339 340 341 342 |
# File 'lib/pocolog/block_stream.rb', line 338 def read_data_block_header DataBlockHeader.parse( read_payload(Format::Current::DATA_BLOCK_HEADER_SIZE) ) end |
#read_data_block_payload ⇒ Object
Read the data payload of a data block, not parsing the header
The IO is assumed to be positioned just after the block header (i.e. after read_next_block_header)
363 364 365 366 367 368 369 370 371 372 |
# File 'lib/pocolog/block_stream.rb', line 363 def read_data_block_payload skip(Format::Current::DATA_BLOCK_HEADER_SIZE - 1) compressed = read_payload(1).unpack('C').first data = read_payload if compressed != 0 # Payload is compressed data = Zlib::Inflate.inflate(data) end data end |
#read_next_block_header ⇒ Object
Read the header of the next block
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/pocolog/block_stream.rb', line 156 def read_next_block_header skip(@payload_size) if @payload_size != 0 header = read(BLOCK_HEADER_SIZE) return unless header if header.size != BLOCK_HEADER_SIZE raise NotEnoughData, "truncated block header (got #{header.size} bytes, "\ "expected #{BLOCK_HEADER_SIZE})" end block = BlockHeader.parse(header) @payload_size = block.payload_size block end |
#read_payload(count = @payload_size) ⇒ Object
Read the payload of the last block returned by #read_next_block_header
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/pocolog/block_stream.rb', line 271 def read_payload(count = @payload_size) if count > @payload_size raise ArgumentError, "expected read count #{count} greater than remaining "\ "payload size #{@payload_size}" end result = read(count) if !result || result.size != count raise NotEnoughData, "expected to read #{count} bytes but got "\ "#{result ? result.size : 'EOF'}" end @payload_size -= count result end |
#read_prologue ⇒ Object
If the IO is a file, it starts with a prologue to describe the file format
Raises MissingPrologue if no prologue is found, or ObsoleteVersion if the file format is not up-to-date (in which case one has to run pocolog –to-new-format).
135 136 137 138 139 140 141 |
# File 'lib/pocolog/block_stream.rb', line 135 def read_prologue # :nodoc: big_endian = Format::Current.read_prologue(io) @format_version = Format::Current::VERSION @big_endian = big_endian @native_endian = ((big_endian != 0) ^ Pocolog.big_endian?) @payload_size = 0 end |
#read_stream_block ⇒ Object
Read one stream block
The IO is assumed to be positioned at the stream definition's block's payload
265 266 267 |
# File 'lib/pocolog/block_stream.rb', line 265 def read_stream_block StreamBlock.parse(read_payload) end |
#rewind ⇒ Object
Move to the beginning of the stream
95 96 97 |
# File 'lib/pocolog/block_stream.rb', line 95 def rewind seek(0) end |
#seek(pos) ⇒ Object
Seek to the current raw position in the IO
The new position is assumed to be at the start of a block
102 103 104 105 106 |
# File 'lib/pocolog/block_stream.rb', line 102 def seek(pos) io.seek(pos) @buffer_io = StringIO.new @payload_size = 0 end |
#skip(count) ⇒ Object
Skip that many bytes in the stream
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/pocolog/block_stream.rb', line 57 def skip(count) buffer_remaining = (@buffer_io.size - @buffer_io.tell) if buffer_remaining < count @buffer_io.seek(buffer_remaining, IO::SEEK_CUR) io.seek(count - buffer_remaining, IO::SEEK_CUR) else @buffer_io.seek(count, IO::SEEK_CUR) end @payload_size -= count end |
#skip_payload ⇒ Object
289 290 291 |
# File 'lib/pocolog/block_stream.rb', line 289 def skip_payload skip(@payload_size) end |
#tell ⇒ Object
Current position in #io
52 53 54 |
# File 'lib/pocolog/block_stream.rb', line 52 def tell io.tell - (@buffer_io.size - @buffer_io.tell) end |