grafana/vendor/github.com/linkedin/goavro/v2/ocf_reader.go

264 lines
8.7 KiB
Go

// Copyright [2019] LinkedIn Corp. Licensed under the Apache License, Version
// 2.0 (the "License"); you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
package goavro
import (
"bytes"
"compress/flate"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"github.com/golang/snappy"
)
// OCFReader structure is used to read Object Container Files (OCF).
type OCFReader struct {
header *ocfHeader
block []byte // buffer from which decoding takes place
rerr error // most recent error that took place while reading bytes (unrecoverable)
ior io.Reader
readReady bool // true after Scan and before Read
remainingBlockItems int64 // count of encoded data items remaining in block buffer to be decoded
}
// NewOCFReader initializes and returns a new structure used to read an Avro
// Object Container File (OCF).
//
// func example(ior io.Reader) error {
// // NOTE: Wrap provided io.Reader in a buffered reader, which improves the
// // performance of streaming file data.
// br := bufio.NewReader(ior)
// ocfr, err := goavro.NewOCFReader(br)
// if err != nil {
// return err
// }
// for ocfr.Scan() {
// datum, err := ocfr.Read()
// if err != nil {
// return err
// }
// fmt.Println(datum)
// }
// return ocfr.Err()
// }
func NewOCFReader(ior io.Reader) (*OCFReader, error) {
header, err := readOCFHeader(ior)
if err != nil {
return nil, fmt.Errorf("cannot create OCFReader: %s", err)
}
return &OCFReader{header: header, ior: ior}, nil
}
//MetaData returns the file metadata map found within the OCF file
func (ocfr *OCFReader) MetaData() map[string][]byte {
return ocfr.header.metadata
}
// Codec returns the codec found within the OCF file.
func (ocfr *OCFReader) Codec() *Codec {
return ocfr.header.codec
}
// CompressionName returns the name of the compression algorithm found within
// the OCF file.
func (ocfr *OCFReader) CompressionName() string {
switch ocfr.header.compressionID {
case compressionNull:
return CompressionNullLabel
case compressionDeflate:
return CompressionDeflateLabel
case compressionSnappy:
return CompressionSnappyLabel
default:
return "should not get here: unrecognized compression algorithm"
}
}
// Err returns the last error encountered while reading the OCF file. See
// `NewOCFReader` documentation for an example.
func (ocfr *OCFReader) Err() error {
return ocfr.rerr
}
// Read consumes one datum value from the Avro OCF stream and returns it. Read
// is designed to be called only once after each invocation of the Scan method.
// See `NewOCFReader` documentation for an example.
func (ocfr *OCFReader) Read() (interface{}, error) {
// NOTE: Test previous error before testing readReady to prevent overwriting
// previous error.
if ocfr.rerr != nil {
return nil, ocfr.rerr
}
if !ocfr.readReady {
ocfr.rerr = errors.New("Read called without successful Scan")
return nil, ocfr.rerr
}
ocfr.readReady = false
// decode one datum value from block
var datum interface{}
datum, ocfr.block, ocfr.rerr = ocfr.header.codec.NativeFromBinary(ocfr.block)
if ocfr.rerr != nil {
return false, ocfr.rerr
}
ocfr.remainingBlockItems--
return datum, nil
}
// RemainingBlockItems returns the number of items remaining in the block being
// processed.
func (ocfr *OCFReader) RemainingBlockItems() int64 {
return ocfr.remainingBlockItems
}
// Scan returns true when there is at least one more data item to be read from
// the Avro OCF. Scan ought to be called prior to calling the Read method each
// time the Read method is invoked. See `NewOCFReader` documentation for an
// example.
func (ocfr *OCFReader) Scan() bool {
ocfr.readReady = false
if ocfr.rerr != nil {
return false
}
// NOTE: If there are no more remaining data items from the existing block,
// then attempt to slurp in the next block.
if ocfr.remainingBlockItems <= 0 {
if count := len(ocfr.block); count != 0 {
ocfr.rerr = fmt.Errorf("extra bytes between final datum in previous block and block sync marker: %d", count)
return false
}
// Read the block count and update the number of remaining items for
// this block
ocfr.remainingBlockItems, ocfr.rerr = longBinaryReader(ocfr.ior)
if ocfr.rerr != nil {
if ocfr.rerr == io.EOF {
ocfr.rerr = nil // merely end of file, rather than error
} else {
ocfr.rerr = fmt.Errorf("cannot read block count: %s", ocfr.rerr)
}
return false
}
if ocfr.remainingBlockItems <= 0 {
ocfr.rerr = fmt.Errorf("cannot decode when block count is not greater than 0: %d", ocfr.remainingBlockItems)
return false
}
if ocfr.remainingBlockItems > MaxBlockCount {
ocfr.rerr = fmt.Errorf("cannot decode when block count exceeds MaxBlockCount: %d > %d", ocfr.remainingBlockItems, MaxBlockCount)
}
var blockSize int64
blockSize, ocfr.rerr = longBinaryReader(ocfr.ior)
if ocfr.rerr != nil {
ocfr.rerr = fmt.Errorf("cannot read block size: %s", ocfr.rerr)
return false
}
if blockSize <= 0 {
ocfr.rerr = fmt.Errorf("cannot decode when block size is not greater than 0: %d", blockSize)
return false
}
if blockSize > MaxBlockSize {
ocfr.rerr = fmt.Errorf("cannot decode when block size exceeds MaxBlockSize: %d > %d", blockSize, MaxBlockSize)
return false
}
// read entire block into buffer
ocfr.block = make([]byte, blockSize)
_, ocfr.rerr = io.ReadFull(ocfr.ior, ocfr.block)
if ocfr.rerr != nil {
ocfr.rerr = fmt.Errorf("cannot read block: %s", ocfr.rerr)
return false
}
switch ocfr.header.compressionID {
case compressionNull:
// no-op
case compressionDeflate:
// NOTE: flate.NewReader wraps with io.ByteReader if argument does
// not implement that interface.
rc := flate.NewReader(bytes.NewBuffer(ocfr.block))
ocfr.block, ocfr.rerr = ioutil.ReadAll(rc)
if ocfr.rerr != nil {
_ = rc.Close()
return false
}
if ocfr.rerr = rc.Close(); ocfr.rerr != nil {
return false
}
case compressionSnappy:
index := len(ocfr.block) - 4 // last 4 bytes is crc32 of decoded block
if index <= 0 {
ocfr.rerr = fmt.Errorf("cannot decompress snappy without CRC32 checksum: %d", len(ocfr.block))
return false
}
decoded, err := snappy.Decode(nil, ocfr.block[:index])
if err != nil {
ocfr.rerr = fmt.Errorf("cannot decompress: %s", err)
return false
}
actualCRC := crc32.ChecksumIEEE(decoded)
expectedCRC := binary.BigEndian.Uint32(ocfr.block[index : index+4])
if actualCRC != expectedCRC {
ocfr.rerr = fmt.Errorf("snappy CRC32 checksum mismatch: %x != %x", actualCRC, expectedCRC)
return false
}
ocfr.block = decoded
default:
ocfr.rerr = fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfr.header.compressionID)
return false
}
// read and ensure sync marker matches
sync := make([]byte, ocfSyncLength)
var n int
if n, ocfr.rerr = io.ReadFull(ocfr.ior, sync); ocfr.rerr != nil {
ocfr.rerr = fmt.Errorf("cannot read sync marker: read %d out of %d bytes: %s", n, ocfSyncLength, ocfr.rerr)
return false
}
if !bytes.Equal(sync, ocfr.header.syncMarker[:]) {
ocfr.rerr = fmt.Errorf("sync marker mismatch: %v != %v", sync, ocfr.header.syncMarker)
return false
}
}
ocfr.readReady = true
return true
}
// SkipThisBlockAndReset can be called after an error occurs while reading or
// decoding datum values from an OCF stream. OCF specifies each OCF stream
// contain one or more blocks of data. Each block consists of a block count, the
// number of bytes for the block, followed be the possibly compressed
// block. Inside each decompressed block is all of the binary encoded datum
// values concatenated together. In other words, OCF framing is at a block level
// rather than a datum level. If there is an error while reading or decoding a
// datum, the reader is not able to skip to the next datum value, because OCF
// does not have any markers for where each datum ends and the next one
// begins. Therefore, the reader is only able to skip this datum value and all
// subsequent datum values in the current block, move to the next block and
// start decoding datum values there.
func (ocfr *OCFReader) SkipThisBlockAndReset() {
// ??? is it an error to call method unless the reader has had an error
ocfr.remainingBlockItems = 0
ocfr.block = ocfr.block[:0]
ocfr.rerr = nil
}