From 6d308210b7ef63d8ea806b680d60fc4c937706b8 Mon Sep 17 00:00:00 2001 From: Jonathan Vuillemin Date: Mon, 8 Jul 2024 15:40:14 +0200 Subject: [PATCH] feat(fxgcppubsub): Fixed race conditions on avro binary codec --- fxgcppubsub/codec/avro.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/fxgcppubsub/codec/avro.go b/fxgcppubsub/codec/avro.go index 4e1419a..d6a5259 100644 --- a/fxgcppubsub/codec/avro.go +++ b/fxgcppubsub/codec/avro.go @@ -15,22 +15,28 @@ var ( // AvroBinaryCodec is a Codec implementation for encoding and decoding with avro schema in binary format. type AvroBinaryCodec struct { + api avro.API schema avro.Schema } // NewAvroBinaryCodec returns a new AvroBinaryCodec instance. func NewAvroBinaryCodec(schemaDefinition string) (*AvroBinaryCodec, error) { + api := avro.Config{}.Freeze() + schema, err := avro.Parse(schemaDefinition) if err != nil { return nil, fmt.Errorf("cannot parse avro schema: %w", err) } - return &AvroBinaryCodec{schema: schema}, nil + return &AvroBinaryCodec{ + api: api, + schema: schema, + }, nil } // Encode encodes in avro binary format the provided input. func (c *AvroBinaryCodec) Encode(in any) ([]byte, error) { - out, err := avro.Marshal(c.schema, in) + out, err := c.api.Marshal(c.schema, in) if err != nil { return nil, fmt.Errorf("cannot encode avro binary: %w", err) } @@ -40,7 +46,7 @@ func (c *AvroBinaryCodec) Encode(in any) ([]byte, error) { // Decode decodes from avro binary format the provided input. func (c *AvroBinaryCodec) Decode(enc []byte, out any) error { - err := avro.Unmarshal(c.schema, enc, out) + err := c.api.Unmarshal(c.schema, enc, out) if err != nil { return fmt.Errorf("cannot decode avro binary: %w", err) }