From a9a245f1ffb07b055d5386c45e0ac0c59575c7cb Mon Sep 17 00:00:00 2001 From: "Alex V. Kotlar" Date: Fri, 5 Apr 2024 20:29:05 -0400 Subject: [PATCH] Expose public Schema and WriteChunk on ArrowWriter (#20) * make schema, writeChunk public on ArrowWriter * add NewArrowIPCFileWriterWithSchema, to facilitate creating file writer from existing arrow feather file --- arrow/arrow.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/arrow/arrow.go b/arrow/arrow.go index 2bc3559..ac33ba3 100644 --- a/arrow/arrow.go +++ b/arrow/arrow.go @@ -12,7 +12,7 @@ import ( ) type ArrowWriter struct { - schema *arrow.Schema + Schema *arrow.Schema writer *ipc.FileWriter mu sync.Mutex } @@ -22,8 +22,10 @@ type ArrowWriter struct { // The ArrowWriter will write to filePath. // This writing operation is threadsafe. func NewArrowIPCFileWriter(f *os.File, fieldNames []string, fieldTypes []arrow.DataType, options ...ipc.Option) (*ArrowWriter, error) { - schema := makeSchema(fieldNames, fieldTypes) + return NewArrowIPCFileWriterWithSchema(f, makeSchema(fieldNames, fieldTypes), options...) +} +func NewArrowIPCFileWriterWithSchema(f *os.File, schema *arrow.Schema, options ...ipc.Option) (*ArrowWriter, error) { schemaOption := ipc.WithSchema(schema) writer, err := ipc.NewFileWriter(f, append([]ipc.Option{schemaOption}, options...)...) if err != nil { @@ -31,12 +33,12 @@ func NewArrowIPCFileWriter(f *os.File, fieldNames []string, fieldTypes []arrow.D } return &ArrowWriter{ - schema: schema, + Schema: schema, writer: writer, }, nil } -func (aw *ArrowWriter) writeChunk(record arrow.Record) error { +func (aw *ArrowWriter) WriteChunk(record arrow.Record) error { aw.mu.Lock() defer aw.mu.Unlock() @@ -73,7 +75,7 @@ type ArrowRowBuilder struct { // to write a chunk. func NewArrowRowBuilder(aw *ArrowWriter, chunkSize int) (*ArrowRowBuilder, error) { pool := memory.NewGoAllocator() - builders, appendFuncs, err := makeBuilders(aw.schema, pool) + builders, appendFuncs, err := makeBuilders(aw.Schema, pool) if err != nil { return nil, err @@ -124,10 +126,10 @@ func (arb *ArrowRowBuilder) writeChunk() error { cols = append(cols, b.NewArray()) } - record := array.NewRecord(arb.arrowWriter.schema, cols, int64(arb.numRowsInChunk)) + record := array.NewRecord(arb.arrowWriter.Schema, cols, int64(arb.numRowsInChunk)) defer record.Release() - if err := arb.arrowWriter.writeChunk(record); err != nil { + if err := arb.arrowWriter.WriteChunk(record); err != nil { return err }