Skip to content

Commit

Permalink
Make it possible to not write standard annotation output (#15)
Browse files Browse the repository at this point in the history
* Add support for noOut parameter

* don't build label arrays if not needed

* make more idiomatic use of variadic args

* write test showing noOut behavior

* use the needsLabels variable instead of config.noOut

* fix noOut + no dosageMatrixOutPath error message

* cleanup test files, pass *os.File instead of filePath

* fix compression not tested; switch to Uint16

* apply Thomas' suggestions, switch to t.TempDir for automatic cleanup

* cleanup
  • Loading branch information
akotlar authored Jan 28, 2024
1 parent 49dfb87 commit 5cfa10a
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 202 deletions.
21 changes: 8 additions & 13 deletions arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,27 @@ import (
)

type ArrowWriter struct {
filePath string
schema *arrow.Schema
writer *ipc.FileWriter
mu sync.Mutex
schema *arrow.Schema
writer *ipc.FileWriter
mu sync.Mutex
}

// Create a new ArrowWriter. The number of fields in fieldNames and fieldTypes
// must match. The number of rows in each chunk is determined by chunkSize.
// The ArrowWriter will write to filePath.
// This writing operation is threadsafe.
func NewArrowWriter(filePath string, fieldNames []string, fieldTypes []arrow.DataType, options []ipc.Option) (*ArrowWriter, error) {
func NewArrowIPCFileWriter(f *os.File, fieldNames []string, fieldTypes []arrow.DataType, options ...ipc.Option) (*ArrowWriter, error) {
schema := makeSchema(fieldNames, fieldTypes)
file, err := os.Create(filePath)
if err != nil {
return nil, err
}

writer, err := ipc.NewFileWriter(file, append([]ipc.Option{ipc.WithSchema(schema)}, options...)...)
schemaOption := ipc.WithSchema(schema)
writer, err := ipc.NewFileWriter(f, append([]ipc.Option{schemaOption}, options...)...)
if err != nil {
return nil, err
}

return &ArrowWriter{
filePath: filePath,
schema: schema,
writer: writer,
schema: schema,
writer: writer,
}, nil
}

Expand Down
79 changes: 58 additions & 21 deletions arrow/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package arrow

import (
"fmt"
"log"
"os"
"path/filepath"
"reflect"
"sort"
"sync"
Expand Down Expand Up @@ -137,11 +137,11 @@ func readArrowRows(filePath string) ([][]any, error) {
return readRows, nil
}

func readAndVerifyArrowFile(filePath string, expectedRows [][]any, sort bool) {
func checkArrowFile(t *testing.T, filePath string, expectedRows [][]any, sort bool) {
readRows, err := readArrowRows(filePath)

if err != nil {
log.Fatal(err)
t.Fatal(err)
}

if sort {
Expand All @@ -151,14 +151,18 @@ func readAndVerifyArrowFile(filePath string, expectedRows [][]any, sort bool) {

for i, row := range readRows {
if !reflect.DeepEqual(row, expectedRows[i]) {
log.Fatalf("Mismatch at row %d: got %v, want %v", i, row, expectedRows[i])
t.Fatalf("Mismatch at row %d: got %v, want %v", i, row, expectedRows[i])
}
}
}

func TestArrowWriteRead(t *testing.T) {
batchSize := 5
filePath := "test_matrix.feather"
filePath := filepath.Join(t.TempDir(), "test.arrow")
file, err := os.Create(filePath)
if err != nil {
t.Fatal(err)
}

// Define the data types for the fields
fieldTypes := []arrow.DataType{arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Uint16}
Expand All @@ -168,7 +172,7 @@ func TestArrowWriteRead(t *testing.T) {
rows[i] = []any{uint16(i), uint16(i + 1), uint16(i + 2)}
}

writer, err := NewArrowWriter(filePath, fieldNames, fieldTypes, nil)
writer, err := NewArrowIPCFileWriter(file, fieldNames, fieldTypes)
if err != nil {
t.Fatal(err)
}
Expand All @@ -192,11 +196,16 @@ func TestArrowWriteRead(t *testing.T) {
t.Fatal(err)
}

readAndVerifyArrowFile(filePath, rows, false)
checkArrowFile(t, filePath, rows, false)
}

func TestArrowWriterHandlesNullValues(t *testing.T) {
filePath := "null_values.feather"
filePath := filepath.Join(t.TempDir(), "null_test.arrow")
file, err := os.Create(filePath)
if err != nil {
t.Fatal(err)
}

fieldTypes := []arrow.DataType{arrow.PrimitiveTypes.Uint8, arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Uint32, arrow.PrimitiveTypes.Uint64,
arrow.PrimitiveTypes.Int8, arrow.PrimitiveTypes.Int16, arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Int64,
arrow.PrimitiveTypes.Float32, arrow.PrimitiveTypes.Float64, arrow.BinaryTypes.String, arrow.FixedWidthTypes.Boolean}
Expand Down Expand Up @@ -235,7 +244,7 @@ func TestArrowWriterHandlesNullValues(t *testing.T) {
fieldNames[i] = fmt.Sprintf("Field %d", i)
}

writer, err := NewArrowWriter(filePath, fieldNames, fieldTypes, nil)
writer, err := NewArrowIPCFileWriter(file, fieldNames, fieldTypes)
if err != nil {
t.Fatal(err)
}
Expand All @@ -259,17 +268,29 @@ func TestArrowWriterHandlesNullValues(t *testing.T) {
t.Fatal(err)
}

readAndVerifyArrowFile(filePath, rows, false)
checkArrowFile(t, filePath, rows, false)
}

func runConcurrentTest(compress bool) {
filePath := "concurrent_output.feather"
func conncurentTestMaybeCompress(t *testing.T, compress bool) {
filePath := filepath.Join(t.TempDir(), "concurrent_output.feather")
file, err := os.Create(filePath)
if err != nil {
t.Fatal(err)
}
defer file.Close()

fieldNames := []string{"Field1", "Field2"}
fieldTypes := []arrow.DataType{arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Uint16}

writer, err := NewArrowWriter(filePath, fieldNames, fieldTypes, nil)
var writer *ArrowWriter
if compress {
writer, err = NewArrowIPCFileWriter(file, fieldNames, fieldTypes, ipc.WithZstd())
} else {
writer, err = NewArrowIPCFileWriter(file, fieldNames, fieldTypes)
}

if err != nil {
log.Fatal(err)
t.Fatal(err)
}

var wg sync.WaitGroup
Expand All @@ -290,37 +311,53 @@ func runConcurrentTest(compress bool) {
batchSize := 1 + routineID%10
builder, err := NewArrowRowBuilder(writer, batchSize)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}

defer wg.Done()
for j := 0; j < numWritesPerRoutine; j++ {
rowToWrite := rows[routineID*numWritesPerRoutine+j]

if err := builder.WriteRow(rowToWrite); err != nil {
log.Fatal(err)
t.Fatal(err)
}
}

if err := builder.Release(); err != nil {
log.Fatal(err)
t.Fatal(err)
}
}(writer, i)
}

wg.Wait()

if err := writer.Close(); err != nil {
log.Fatal(err)
t.Fatal(err)
}

readAndVerifyArrowFile(filePath, rows, true)
checkArrowFile(t, filePath, rows, true)
}

func TestArrowWriterConcurrency(t *testing.T) {
runConcurrentTest(false)
conncurentTestMaybeCompress(t, false)
}

func TestArrowWriterConcurrencyWithCompression(t *testing.T) {
runConcurrentTest(true)
conncurentTestMaybeCompress(t, true)
}

func TestNewArrowIPCFileWriterWithZstdOption(t *testing.T) {
filePath := filepath.Join(t.TempDir(), "test.arrow")
file, err := os.Create(filePath)
if err != nil {
t.Errorf("Unexpected error when creating file: %v", err)
}

fieldNames := []string{"field1", "field2"}
fieldTypes := []arrow.DataType{arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Float64}

_, err = NewArrowIPCFileWriter(file, fieldNames, fieldTypes, ipc.WithZstd())
if err != nil {
t.Errorf("Unexpected error when passing WithZstd as an option: %v", err)
}
}
Loading

0 comments on commit 5cfa10a

Please # to comment.