Skip to content

Commit

Permalink
Expose public Schema and WriteChunk on ArrowWriter (#20)
Browse files Browse the repository at this point in the history
* make schema, writeChunk public on ArrowWriter

* add NewArrowIPCFileWriterWithSchema, to facilitate creating file writer from existing arrow feather file
  • Loading branch information
akotlar authored Apr 6, 2024
1 parent cb7a8a5 commit a9a245f
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type ArrowWriter struct {
schema *arrow.Schema
Schema *arrow.Schema
writer *ipc.FileWriter
mu sync.Mutex
}
Expand All @@ -22,21 +22,23 @@ type ArrowWriter struct {
// The ArrowWriter will write to filePath.
// This writing operation is threadsafe.
func NewArrowIPCFileWriter(f *os.File, fieldNames []string, fieldTypes []arrow.DataType, options ...ipc.Option) (*ArrowWriter, error) {
schema := makeSchema(fieldNames, fieldTypes)
return NewArrowIPCFileWriterWithSchema(f, makeSchema(fieldNames, fieldTypes), options...)
}

func NewArrowIPCFileWriterWithSchema(f *os.File, schema *arrow.Schema, options ...ipc.Option) (*ArrowWriter, error) {
schemaOption := ipc.WithSchema(schema)
writer, err := ipc.NewFileWriter(f, append([]ipc.Option{schemaOption}, options...)...)
if err != nil {
return nil, err
}

return &ArrowWriter{
schema: schema,
Schema: schema,
writer: writer,
}, nil
}

func (aw *ArrowWriter) writeChunk(record arrow.Record) error {
func (aw *ArrowWriter) WriteChunk(record arrow.Record) error {
aw.mu.Lock()
defer aw.mu.Unlock()

Expand Down Expand Up @@ -73,7 +75,7 @@ type ArrowRowBuilder struct {
// to write a chunk.
func NewArrowRowBuilder(aw *ArrowWriter, chunkSize int) (*ArrowRowBuilder, error) {
pool := memory.NewGoAllocator()
builders, appendFuncs, err := makeBuilders(aw.schema, pool)
builders, appendFuncs, err := makeBuilders(aw.Schema, pool)

if err != nil {
return nil, err
Expand Down Expand Up @@ -124,10 +126,10 @@ func (arb *ArrowRowBuilder) writeChunk() error {
cols = append(cols, b.NewArray())
}

record := array.NewRecord(arb.arrowWriter.schema, cols, int64(arb.numRowsInChunk))
record := array.NewRecord(arb.arrowWriter.Schema, cols, int64(arb.numRowsInChunk))
defer record.Release()

if err := arb.arrowWriter.writeChunk(record); err != nil {
if err := arb.arrowWriter.WriteChunk(record); err != nil {
return err
}

Expand Down

0 comments on commit a9a245f

Please # to comment.