diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 589d404..5934e5d 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -99,7 +99,7 @@ var consumeCmd = &cobra.Command{ client := getClientFromConfig(cfg) schemaCache = getSchemaCache() - avroDecoder = codec.NewAvroCodec(-1, schemaCache) + avroDecoder = codec.NewAvroCodec(-1, false, schemaCache) protoDecoder = codec.NewProtoCodec(protoType, reg) protoKeyDecoder = codec.NewProtoCodec(keyProtoType, reg) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index c22546a..66ee2bb 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -32,6 +32,7 @@ var ( inputModeFlag string avroSchemaID int avroKeySchemaID int + avroStrictFlag bool templateFlag bool ) @@ -54,6 +55,7 @@ func init() { produceCmd.Flags().IntVarP(&avroSchemaID, "avro-schema-id", "", -1, "Value schema id for avro messsage encoding") produceCmd.Flags().IntVarP(&avroKeySchemaID, "avro-key-schema-id", "", -1, "Key schema id for avro messsage encoding") + produceCmd.Flags().BoolVar(&avroStrictFlag, "avro-strict", false, "Uses strict version of the input json to parse unions") produceCmd.Flags().StringVarP(&inputModeFlag, "input-mode", "", "line", "Scanning input mode: [line|full]") produceCmd.Flags().IntVarP(&bufferSizeFlag, "line-length-limit", "", 0, "line length limit in line input mode") @@ -91,7 +93,7 @@ func valueEncoder() codec.Encoder { if protoType != "" { return codec.NewProtoCodec(protoType, reg) } else if avroSchemaID != -1 { - return codec.NewAvroCodec(avroSchemaID, schemaCache) + return codec.NewAvroCodec(avroSchemaID, avroStrictFlag, schemaCache) } else { return &codec.BypassCodec{} } @@ -101,7 +103,7 @@ func keyEncoder() codec.Encoder { if keyProtoType != "" { return codec.NewProtoCodec(keyProtoType, reg) } else if avroKeySchemaID != -1 { - return codec.NewAvroCodec(avroKeySchemaID, schemaCache) + return codec.NewAvroCodec(avroKeySchemaID, avroStrictFlag, schemaCache) } else { return &codec.BypassCodec{} } diff --git a/pkg/avro/schema.go b/pkg/avro/schema.go index ba396bd..00e93f3 100644 --- a/pkg/avro/schema.go +++ b/pkg/avro/schema.go @@ -1,7 +1,6 @@ package avro import ( - "encoding/binary" "sync" schemaregistry "github.com/Landoop/schema-registry" @@ -38,7 +37,7 @@ func NewSchemaCache(url string) (*SchemaCache, error) { } // getCodecForSchemaID returns a goavro codec for transforming data. -func (c *SchemaCache) GetCodecForSchemaID(schemaID int) (codec *goavro.Codec, err error) { +func (c *SchemaCache) GetCodecForSchemaID(schemaID int, strict bool) (codec *goavro.Codec, err error) { c.mu.RLock() cc, ok := c.codecsBySchemaID[schemaID] c.mu.RUnlock() @@ -74,40 +73,14 @@ func (c *SchemaCache) GetCodecForSchemaID(schemaID int) (codec *goavro.Codec, er return nil, err } - codec, err = goavro.NewCodec(schema) + if strict { + codec, err = goavro.NewCodec(schema) + } else { + codec, err = goavro.NewCodecForStandardJSON(schema) + } if err != nil { return nil, err } return codec, nil } - -// DecodeMessage returns a text representation of an Avro-encoded message. -func (c *SchemaCache) DecodeMessage(b []byte) (message []byte, err error) { - // Ensure avro header is present with the magic start-byte. - if len(b) < 5 || b[0] != 0x00 { - // The message does not contain Avro-encoded data - return b, nil - } - - // Schema ID is stored in the 4 bytes following the magic byte. - schemaID := binary.BigEndian.Uint32(b[1:5]) - codec, err := c.GetCodecForSchemaID(int(schemaID)) - if err != nil { - return b, err - } - - // Convert binary Avro data back to native Go form - native, _, err := codec.NativeFromBinary(b[5:]) - if err != nil { - return b, err - } - - // Convert native Go form to textual Avro data - message, err = codec.TextualFromNative(nil, native) - if err != nil { - return b, err - } - - return message, nil -} diff --git a/pkg/codec/avro.go b/pkg/codec/avro.go index 8ecf475..f2ddfde 100644 --- a/pkg/codec/avro.go +++ b/pkg/codec/avro.go @@ -11,16 +11,17 @@ import ( // avro formats type AvroCodec struct { encodeSchemaID int + strict bool schemaCache *avro.SchemaCache } -func NewAvroCodec(schemaID int, cache *avro.SchemaCache) *AvroCodec { - return &AvroCodec{schemaID, cache} +func NewAvroCodec(schemaID int, strict bool, cache *avro.SchemaCache) *AvroCodec { + return &AvroCodec{schemaID, strict, cache} } // Encode returns a binary representation of an Avro-encoded message. func (a *AvroCodec) Encode(in json.RawMessage) ([]byte, error) { - codec, err := a.schemaCache.GetCodecForSchemaID(a.encodeSchemaID) + codec, err := a.schemaCache.GetCodecForSchemaID(a.encodeSchemaID, a.strict) if err != nil { return nil, err } @@ -55,7 +56,7 @@ func (a *AvroCodec) Decode(in []byte) (json.RawMessage, error) { // Schema ID is stored in the 4 bytes following the magic byte. schemaID := binary.BigEndian.Uint32(in[1:5]) - codec, err := a.schemaCache.GetCodecForSchemaID(int(schemaID)) + codec, err := a.schemaCache.GetCodecForSchemaID(int(schemaID), a.strict) if err != nil { return in, err }