Skip to content

Commit

Permalink
Adding flag to use strict avro parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiojmendes committed May 2, 2022
1 parent 1add193 commit 6b57272
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 40 deletions.
2 changes: 1 addition & 1 deletion cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ var consumeCmd = &cobra.Command{
client := getClientFromConfig(cfg)

schemaCache = getSchemaCache()
avroDecoder = codec.NewAvroCodec(-1, schemaCache)
avroDecoder = codec.NewAvroCodec(-1, false, schemaCache)
protoDecoder = codec.NewProtoCodec(protoType, reg)
protoKeyDecoder = codec.NewProtoCodec(keyProtoType, reg)

Expand Down
6 changes: 4 additions & 2 deletions cmd/kaf/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
inputModeFlag string
avroSchemaID int
avroKeySchemaID int
avroStrictFlag bool
templateFlag bool
)

Expand All @@ -54,6 +55,7 @@ func init() {

produceCmd.Flags().IntVarP(&avroSchemaID, "avro-schema-id", "", -1, "Value schema id for avro messsage encoding")
produceCmd.Flags().IntVarP(&avroKeySchemaID, "avro-key-schema-id", "", -1, "Key schema id for avro messsage encoding")
produceCmd.Flags().BoolVar(&avroStrictFlag, "avro-strict", false, "Uses strict version of the input json to parse unions")

produceCmd.Flags().StringVarP(&inputModeFlag, "input-mode", "", "line", "Scanning input mode: [line|full]")
produceCmd.Flags().IntVarP(&bufferSizeFlag, "line-length-limit", "", 0, "line length limit in line input mode")
Expand Down Expand Up @@ -91,7 +93,7 @@ func valueEncoder() codec.Encoder {
if protoType != "" {
return codec.NewProtoCodec(protoType, reg)
} else if avroSchemaID != -1 {
return codec.NewAvroCodec(avroSchemaID, schemaCache)
return codec.NewAvroCodec(avroSchemaID, avroStrictFlag, schemaCache)
} else {
return &codec.BypassCodec{}
}
Expand All @@ -101,7 +103,7 @@ func keyEncoder() codec.Encoder {
if keyProtoType != "" {
return codec.NewProtoCodec(keyProtoType, reg)
} else if avroKeySchemaID != -1 {
return codec.NewAvroCodec(avroKeySchemaID, schemaCache)
return codec.NewAvroCodec(avroKeySchemaID, avroStrictFlag, schemaCache)
} else {
return &codec.BypassCodec{}
}
Expand Down
39 changes: 6 additions & 33 deletions pkg/avro/schema.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package avro

import (
"encoding/binary"
"sync"

schemaregistry "github.com/Landoop/schema-registry"
Expand Down Expand Up @@ -38,7 +37,7 @@ func NewSchemaCache(url string) (*SchemaCache, error) {
}

// getCodecForSchemaID returns a goavro codec for transforming data.
func (c *SchemaCache) GetCodecForSchemaID(schemaID int) (codec *goavro.Codec, err error) {
func (c *SchemaCache) GetCodecForSchemaID(schemaID int, strict bool) (codec *goavro.Codec, err error) {
c.mu.RLock()
cc, ok := c.codecsBySchemaID[schemaID]
c.mu.RUnlock()
Expand Down Expand Up @@ -74,40 +73,14 @@ func (c *SchemaCache) GetCodecForSchemaID(schemaID int) (codec *goavro.Codec, er
return nil, err
}

codec, err = goavro.NewCodec(schema)
if strict {
codec, err = goavro.NewCodec(schema)
} else {
codec, err = goavro.NewCodecForStandardJSON(schema)
}
if err != nil {
return nil, err
}

return codec, nil
}

// DecodeMessage returns a text representation of an Avro-encoded message.
func (c *SchemaCache) DecodeMessage(b []byte) (message []byte, err error) {
// Ensure avro header is present with the magic start-byte.
if len(b) < 5 || b[0] != 0x00 {
// The message does not contain Avro-encoded data
return b, nil
}

// Schema ID is stored in the 4 bytes following the magic byte.
schemaID := binary.BigEndian.Uint32(b[1:5])
codec, err := c.GetCodecForSchemaID(int(schemaID))
if err != nil {
return b, err
}

// Convert binary Avro data back to native Go form
native, _, err := codec.NativeFromBinary(b[5:])
if err != nil {
return b, err
}

// Convert native Go form to textual Avro data
message, err = codec.TextualFromNative(nil, native)
if err != nil {
return b, err
}

return message, nil
}
9 changes: 5 additions & 4 deletions pkg/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ import (
// avro formats
type AvroCodec struct {
encodeSchemaID int
strict bool
schemaCache *avro.SchemaCache
}

func NewAvroCodec(schemaID int, cache *avro.SchemaCache) *AvroCodec {
return &AvroCodec{schemaID, cache}
func NewAvroCodec(schemaID int, strict bool, cache *avro.SchemaCache) *AvroCodec {
return &AvroCodec{schemaID, strict, cache}
}

// Encode returns a binary representation of an Avro-encoded message.
func (a *AvroCodec) Encode(in json.RawMessage) ([]byte, error) {
codec, err := a.schemaCache.GetCodecForSchemaID(a.encodeSchemaID)
codec, err := a.schemaCache.GetCodecForSchemaID(a.encodeSchemaID, a.strict)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -55,7 +56,7 @@ func (a *AvroCodec) Decode(in []byte) (json.RawMessage, error) {

// Schema ID is stored in the 4 bytes following the magic byte.
schemaID := binary.BigEndian.Uint32(in[1:5])
codec, err := a.schemaCache.GetCodecForSchemaID(int(schemaID))
codec, err := a.schemaCache.GetCodecForSchemaID(int(schemaID), a.strict)
if err != nil {
return in, err
}
Expand Down

0 comments on commit 6b57272

Please # to comment.