From 5cfa10a29ae28ebdf3f1f4a062f5cb6abf183db4 Mon Sep 17 00:00:00 2001 From: "Alex V. Kotlar" Date: Sun, 28 Jan 2024 14:16:20 -0500 Subject: [PATCH] Make it possible to not write standard annotation output (#15) * Add support for noOut parameter * don't build label arrays if not needed * make more idiomatic use of variadic args * write test showing noOut behavior * use the needsLabels variable instead of config.noOut * fix noOut + no dosageMatrixOutPath error message * cleanup test files, pass *os.File instead of filePath * fix compression not tested; switch to Uint16 * apply Thomas' suggestions, switch to t.TempDir for automatic cleanup * cleanup --- arrow/arrow.go | 21 ++- arrow/arrow_test.go | 79 +++++++--- main.go | 341 ++++++++++++++++++++++++++------------------ main_test.go | 116 +++++++++++---- 4 files changed, 355 insertions(+), 202 deletions(-) diff --git a/arrow/arrow.go b/arrow/arrow.go index 054faf1..2bc3559 100644 --- a/arrow/arrow.go +++ b/arrow/arrow.go @@ -12,32 +12,27 @@ import ( ) type ArrowWriter struct { - filePath string - schema *arrow.Schema - writer *ipc.FileWriter - mu sync.Mutex + schema *arrow.Schema + writer *ipc.FileWriter + mu sync.Mutex } // Create a new ArrowWriter. The number of fields in fieldNames and fieldTypes // must match. The number of rows in each chunk is determined by chunkSize. // The ArrowWriter will write to filePath. // This writing operation is threadsafe. -func NewArrowWriter(filePath string, fieldNames []string, fieldTypes []arrow.DataType, options []ipc.Option) (*ArrowWriter, error) { +func NewArrowIPCFileWriter(f *os.File, fieldNames []string, fieldTypes []arrow.DataType, options ...ipc.Option) (*ArrowWriter, error) { schema := makeSchema(fieldNames, fieldTypes) - file, err := os.Create(filePath) - if err != nil { - return nil, err - } - writer, err := ipc.NewFileWriter(file, append([]ipc.Option{ipc.WithSchema(schema)}, options...)...) + schemaOption := ipc.WithSchema(schema) + writer, err := ipc.NewFileWriter(f, append([]ipc.Option{schemaOption}, options...)...) if err != nil { return nil, err } return &ArrowWriter{ - filePath: filePath, - schema: schema, - writer: writer, + schema: schema, + writer: writer, }, nil } diff --git a/arrow/arrow_test.go b/arrow/arrow_test.go index eeda6a8..b95b310 100644 --- a/arrow/arrow_test.go +++ b/arrow/arrow_test.go @@ -2,8 +2,8 @@ package arrow import ( "fmt" - "log" "os" + "path/filepath" "reflect" "sort" "sync" @@ -137,11 +137,11 @@ func readArrowRows(filePath string) ([][]any, error) { return readRows, nil } -func readAndVerifyArrowFile(filePath string, expectedRows [][]any, sort bool) { +func checkArrowFile(t *testing.T, filePath string, expectedRows [][]any, sort bool) { readRows, err := readArrowRows(filePath) if err != nil { - log.Fatal(err) + t.Fatal(err) } if sort { @@ -151,14 +151,18 @@ func readAndVerifyArrowFile(filePath string, expectedRows [][]any, sort bool) { for i, row := range readRows { if !reflect.DeepEqual(row, expectedRows[i]) { - log.Fatalf("Mismatch at row %d: got %v, want %v", i, row, expectedRows[i]) + t.Fatalf("Mismatch at row %d: got %v, want %v", i, row, expectedRows[i]) } } } func TestArrowWriteRead(t *testing.T) { batchSize := 5 - filePath := "test_matrix.feather" + filePath := filepath.Join(t.TempDir(), "test.arrow") + file, err := os.Create(filePath) + if err != nil { + t.Fatal(err) + } // Define the data types for the fields fieldTypes := []arrow.DataType{arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Uint16} @@ -168,7 +172,7 @@ func TestArrowWriteRead(t *testing.T) { rows[i] = []any{uint16(i), uint16(i + 1), uint16(i + 2)} } - writer, err := NewArrowWriter(filePath, fieldNames, fieldTypes, nil) + writer, err := NewArrowIPCFileWriter(file, fieldNames, fieldTypes) if err != nil { t.Fatal(err) } @@ -192,11 +196,16 @@ func TestArrowWriteRead(t *testing.T) { t.Fatal(err) } - readAndVerifyArrowFile(filePath, rows, false) + checkArrowFile(t, filePath, rows, false) } func TestArrowWriterHandlesNullValues(t *testing.T) { - filePath := "null_values.feather" + filePath := filepath.Join(t.TempDir(), "null_test.arrow") + file, err := os.Create(filePath) + if err != nil { + t.Fatal(err) + } + fieldTypes := []arrow.DataType{arrow.PrimitiveTypes.Uint8, arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Uint32, arrow.PrimitiveTypes.Uint64, arrow.PrimitiveTypes.Int8, arrow.PrimitiveTypes.Int16, arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Int64, arrow.PrimitiveTypes.Float32, arrow.PrimitiveTypes.Float64, arrow.BinaryTypes.String, arrow.FixedWidthTypes.Boolean} @@ -235,7 +244,7 @@ func TestArrowWriterHandlesNullValues(t *testing.T) { fieldNames[i] = fmt.Sprintf("Field %d", i) } - writer, err := NewArrowWriter(filePath, fieldNames, fieldTypes, nil) + writer, err := NewArrowIPCFileWriter(file, fieldNames, fieldTypes) if err != nil { t.Fatal(err) } @@ -259,17 +268,29 @@ func TestArrowWriterHandlesNullValues(t *testing.T) { t.Fatal(err) } - readAndVerifyArrowFile(filePath, rows, false) + checkArrowFile(t, filePath, rows, false) } -func runConcurrentTest(compress bool) { - filePath := "concurrent_output.feather" +func conncurentTestMaybeCompress(t *testing.T, compress bool) { + filePath := filepath.Join(t.TempDir(), "concurrent_output.feather") + file, err := os.Create(filePath) + if err != nil { + t.Fatal(err) + } + defer file.Close() + fieldNames := []string{"Field1", "Field2"} fieldTypes := []arrow.DataType{arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Uint16} - writer, err := NewArrowWriter(filePath, fieldNames, fieldTypes, nil) + var writer *ArrowWriter + if compress { + writer, err = NewArrowIPCFileWriter(file, fieldNames, fieldTypes, ipc.WithZstd()) + } else { + writer, err = NewArrowIPCFileWriter(file, fieldNames, fieldTypes) + } + if err != nil { - log.Fatal(err) + t.Fatal(err) } var wg sync.WaitGroup @@ -290,7 +311,7 @@ func runConcurrentTest(compress bool) { batchSize := 1 + routineID%10 builder, err := NewArrowRowBuilder(writer, batchSize) if err != nil { - log.Fatal(err) + t.Fatal(err) } defer wg.Done() @@ -298,12 +319,12 @@ func runConcurrentTest(compress bool) { rowToWrite := rows[routineID*numWritesPerRoutine+j] if err := builder.WriteRow(rowToWrite); err != nil { - log.Fatal(err) + t.Fatal(err) } } if err := builder.Release(); err != nil { - log.Fatal(err) + t.Fatal(err) } }(writer, i) } @@ -311,16 +332,32 @@ func runConcurrentTest(compress bool) { wg.Wait() if err := writer.Close(); err != nil { - log.Fatal(err) + t.Fatal(err) } - readAndVerifyArrowFile(filePath, rows, true) + checkArrowFile(t, filePath, rows, true) } func TestArrowWriterConcurrency(t *testing.T) { - runConcurrentTest(false) + conncurentTestMaybeCompress(t, false) } func TestArrowWriterConcurrencyWithCompression(t *testing.T) { - runConcurrentTest(true) + conncurentTestMaybeCompress(t, true) +} + +func TestNewArrowIPCFileWriterWithZstdOption(t *testing.T) { + filePath := filepath.Join(t.TempDir(), "test.arrow") + file, err := os.Create(filePath) + if err != nil { + t.Errorf("Unexpected error when creating file: %v", err) + } + + fieldNames := []string{"field1", "field2"} + fieldTypes := []arrow.DataType{arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Float64} + + _, err = NewArrowIPCFileWriter(file, fieldNames, fieldTypes, ipc.WithZstd()) + if err != nil { + t.Errorf("Unexpected error when passing WithZstd as an option: %v", err) + } } diff --git a/main.go b/main.go index 7e7688f..32bed49 100644 --- a/main.go +++ b/main.go @@ -63,6 +63,7 @@ const precision = 3 type Config struct { inPath string outPath string + noOut bool dosageMatrixOutPath string sampleListPath string famPath string @@ -83,9 +84,10 @@ func setup(args []string) *Config { flag.StringVar(&config.inPath, "in", "", "The input file path (optional: default stdin)") flag.StringVar(&config.famPath, "fam", "", "The fam file path (optional)") flag.StringVar(&config.errPath, "err", "", "The log path (optional: default stderr)") - flag.StringVar(&config.outPath, "out", "", "The output path (optional: default stdout") + flag.StringVar(&config.outPath, "out", "", "The output path (optional: default stdout)") + flag.BoolVar(&config.noOut, "noOut", false, "Skip writing output (useful in conjunction with dosageOutput)") flag.StringVar(&config.dosageMatrixOutPath, "dosageOutput", "", "The output path for the dosage matrix (optional). If not provided, dosage matrix will not be output.") - flag.StringVar(&config.sampleListPath, "sample", "", "The output path of the sample list (optional: default stdout") + flag.StringVar(&config.sampleListPath, "sample", "", "The output path of the sample list (optional: default stdout)") flag.StringVar(&config.emptyField, "emptyField", "!", "The output path for the JSON output (optional)") flag.StringVar(&config.fieldDelimiter, "fieldDelimiter", ";", "The output path for the JSON output (optional)") flag.BoolVar(&config.keepID, "keepId", false, "Retain the ID field in output") @@ -154,18 +156,29 @@ func main() { } outFh := (*os.File)(nil) - if config.outPath != "" { - var err error - outFh, err = os.OpenFile(config.outPath, os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - log.Fatal(err) - } - } else { - outFh = os.Stdout + if config.noOut && config.outPath != "" { + log.Fatal("Cannot specify --noOut and --out") + } + + if config.noOut && config.dosageMatrixOutPath == "" { + log.Fatal("When specifying --noOut, must specify --dosageOutput") } - defer outFh.Close() + if !config.noOut { + if config.outPath != "" { + var err error + + outFh, err = os.OpenFile(config.outPath, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + log.Fatal(err) + } + } else { + outFh = os.Stdout + } + + defer outFh.Close() + } if config.cpuProfile != "" { f, err := os.Create(config.cpuProfile) @@ -178,22 +191,28 @@ func main() { reader := bufio.NewReaderSize(inFh, 48*1024*1024) - writer := bufio.NewWriterSize(outFh, 48*1024*1024) + var writer *bufio.Writer + + if !config.noOut { + writer = bufio.NewWriterSize(outFh, 48*1024*1024) - fmt.Fprintln(writer, stringHeader(config)) + fmt.Fprintln(writer, stringHeader(config)) + } readVcf(config, reader, writer) - err := writer.Flush() + if !config.noOut { + err := writer.Flush() - if err != nil { - log.Fatal(err) - } + if err != nil { + log.Fatal(err) + } - err = outFh.Close() + err = outFh.Close() - if err != nil { - log.Print(err) + if err != nil { + log.Print(err) + } } } @@ -276,10 +295,12 @@ func readVcf(config *Config, reader *bufio.Reader, writer *bufio.Writer) { parse.NormalizeHeader(header) - err = writeSampleListIfWanted(config, header) + if !config.noOut { + err = writeSampleListIfWanted(config, header) - if err != nil { - log.Fatal("Couldn't write sample list file") + if err != nil { + log.Fatal("Couldn't write sample list file") + } } var arrowWriter *bystroArrow.ArrowWriter @@ -290,13 +311,16 @@ func readVcf(config *Config, reader *bufio.Reader, writer *bufio.Writer) { fieldTypes := make([]arrow.DataType, len(fieldNames)) fieldTypes[0] = arrow.BinaryTypes.String for i := 1; i < len(fieldNames); i++ { - fieldTypes[i] = arrow.PrimitiveTypes.Int16 + fieldTypes[i] = arrow.PrimitiveTypes.Uint16 } - options := []ipc.Option{ipc.WithZstd()} + file, err := os.Create(config.dosageMatrixOutPath) + if err != nil { + log.Fatal(err) + } + defer file.Close() - var err error - arrowWriter, err = bystroArrow.NewArrowWriter(config.dosageMatrixOutPath, fieldNames, fieldTypes, options) + arrowWriter, err = bystroArrow.NewArrowIPCFileWriter(file, fieldNames, fieldTypes, ipc.WithZstd()) if err != nil { log.Fatal(err) } @@ -349,9 +373,11 @@ func readVcf(config *Config, reader *bufio.Reader, writer *bufio.Writer) { <-complete } - err = arrowWriter.Close() - if err != nil { - log.Fatal(err) + if arrowWriter != nil { + err = arrowWriter.Close() + if err != nil { + log.Fatal(err) + } } } @@ -459,6 +485,9 @@ func processLines(header []string, numChars int, config *Config, queue chan [][] excludedFilters := config.excludedFilters keepPos := config.keepPos + needsLabels := !config.noOut + needsDosages := config.dosageMatrixOutPath != "" + if len(header) > sampleIdx { numSamples = float64(len(header) - sampleIdx) } else if len(header) == sampleIdx { @@ -478,7 +507,7 @@ func processLines(header []string, numChars int, config *Config, queue chan [][] } for lines := range queue { - if output.Len() >= 2e6 { + if !config.noOut && output.Len() >= 2e6 { fileMutex.Lock() writer.Write(output.Bytes()) @@ -510,9 +539,9 @@ func processLines(header []string, numChars int, config *Config, queue chan [][] // If no samples are provided, annotate what we can, skipping hets and homs // If samples are provided, but only missing genotypes, skip the site altogether if numSamples > 0 { - homs, hets, missing, dosages, ac, an = makeHetHomozygotes(record, header, strAlt) + homs, hets, missing, dosages, ac, an = makeHetHomozygotes(record, header, strAlt, needsLabels, needsDosages) - if len(homs) == 0 && len(hets) == 0 { + if ac == 0 { continue } @@ -530,21 +559,6 @@ func processLines(header []string, numChars int, config *Config, queue chan [][] chrom = record[chromIdx] } - output.WriteString(chrom) - output.WriteByte(tabByte) - - output.WriteString(positions[i]) - output.WriteByte(tabByte) - - output.WriteString(siteType) - output.WriteByte(tabByte) - - output.WriteByte(refs[i]) - output.WriteByte(tabByte) - - output.WriteString(alts[i]) - output.WriteByte(tabByte) - if arrowBuilder != nil { arrowRow = append(arrowRow, fmt.Sprintf("%s:%s:%s:%s", chrom, positions[i], string(refs[i]), alts[i])) @@ -555,99 +569,116 @@ func processLines(header []string, numChars int, config *Config, queue chan [][] arrowBuilder.WriteRow(arrowRow) } - if multiallelic { - output.WriteString(parse.NotTrTv) - } else { - output.WriteString(parse.GetTrTv(string(refs[i]), alts[i])) - } + if needsLabels { + output.WriteString(chrom) + output.WriteByte(tabByte) - output.WriteByte(tabByte) + output.WriteString(positions[i]) + output.WriteByte(tabByte) - // Write missing samples - // heterozygotes \t heterozygosity - if len(hets) == 0 { - output.WriteString(emptyField) + output.WriteString(siteType) output.WriteByte(tabByte) - output.WriteByte(zeroByte) - } else { - output.WriteString(strings.Join(hets, fieldDelim)) + + output.WriteByte(refs[i]) output.WriteByte(tabByte) - // This gives plenty precision; we are mostly interested in - // the first or maybe 2-3 significant digits - // https://play.golang.org/p/Ux-QmClaJG - // Also, gnomAD seems to use 6 bits of precision - // the bitSize == 64 allows us to round properly past 6 s.f - // Note: 'G' requires these numbers to be < 0 for proper precision - // (elase only 6 s.f total, rather than after decimal) - output.WriteString(strconv.FormatFloat(float64(len(hets))/effectiveSamples, 'G', precision, 64)) - } + output.WriteString(alts[i]) + output.WriteByte(tabByte) - output.WriteByte(tabByte) + if multiallelic { + output.WriteString(parse.NotTrTv) + } else { + output.WriteString(parse.GetTrTv(string(refs[i]), alts[i])) + } - // Write missing samples - // homozygotes \t homozygosity - if len(homs) == 0 { - output.WriteString(emptyField) - output.WriteByte(tabByte) - output.WriteByte(zeroByte) - } else { - output.WriteString(strings.Join(homs, fieldDelim)) output.WriteByte(tabByte) - output.WriteString(strconv.FormatFloat(float64(len(homs))/effectiveSamples, 'G', precision, 64)) - } - output.WriteByte(tabByte) + // Write missing samples + // heterozygotes \t heterozygosity + if len(hets) == 0 { + output.WriteString(emptyField) + output.WriteByte(tabByte) + output.WriteByte(zeroByte) + } else { + output.WriteString(strings.Join(hets, fieldDelim)) + output.WriteByte(tabByte) + + // This gives plenty precision; we are mostly interested in + // the first or maybe 2-3 significant digits + // https://play.golang.org/p/Ux-QmClaJG + // Also, gnomAD seems to use 6 bits of precision + // the bitSize == 64 allows us to round properly past 6 s.f + // Note: 'G' requires these numbers to be < 0 for proper precision + // (elase only 6 s.f total, rather than after decimal) + output.WriteString(strconv.FormatFloat(float64(len(hets))/effectiveSamples, 'G', precision, 64)) + } - // Write missing samples - // missingGenos \t missingness - if len(missing) == 0 { - output.WriteString(emptyField) - output.WriteByte(tabByte) - output.WriteByte(zeroByte) - } else { - output.WriteString(strings.Join(missing, fieldDelim)) output.WriteByte(tabByte) - output.WriteString(strconv.FormatFloat(float64(len(missing))/numSamples, 'G', precision, 64)) - } - // Write the sample minor allele frequency - output.WriteByte(tabByte) + // Write missing samples + // homozygotes \t homozygosity + if len(homs) == 0 { + output.WriteString(emptyField) + output.WriteByte(tabByte) + output.WriteByte(zeroByte) + } else { + output.WriteString(strings.Join(homs, fieldDelim)) + output.WriteByte(tabByte) + output.WriteString(strconv.FormatFloat(float64(len(homs))/effectiveSamples, 'G', precision, 64)) + } - output.WriteString(strconv.Itoa(ac)) - output.WriteByte(tabByte) - output.WriteString(strconv.Itoa(an)) - output.WriteByte(tabByte) + output.WriteByte(tabByte) - // TODO: can ac == 0 && (len(het) > 0 || len(hom) > 0) occur? - if ac == 0 { - output.WriteByte(zeroByte) - } else { - output.WriteString(strconv.FormatFloat(float64(ac)/float64(an), 'G', precision, 64)) - } + // Write missing samples + // missingGenos \t missingness + if len(missing) == 0 { + output.WriteString(emptyField) + output.WriteByte(tabByte) + output.WriteByte(zeroByte) + } else { + output.WriteString(strings.Join(missing, fieldDelim)) + output.WriteByte(tabByte) + output.WriteString(strconv.FormatFloat(float64(len(missing))/numSamples, 'G', precision, 64)) + } - /******************* Optional Fields ***********************/ - if keepPos == true { + // Write the sample minor allele frequency output.WriteByte(tabByte) - output.WriteString(record[posIdx]) - } - if keepID == true { + output.WriteString(strconv.Itoa(ac)) output.WriteByte(tabByte) - output.WriteString(record[idIdx]) - } - - if keepInfo == true { - // Write the index of the allele, to allow users to segregate data in the INFO field + output.WriteString(strconv.Itoa(an)) output.WriteByte(tabByte) - output.WriteString(strconv.Itoa(altIndices[i])) - // Write info for all indices - output.WriteByte(tabByte) - output.WriteString(record[infoIdx]) - } + // TODO: can ac == 0 && (len(het) > 0 || len(hom) > 0) occur? + if ac == 0 { + output.WriteByte(zeroByte) + } else { + output.WriteString(strconv.FormatFloat(float64(ac)/float64(an), 'G', precision, 64)) + } + + /******************* Optional Fields ***********************/ + if keepPos == true { + output.WriteByte(tabByte) + output.WriteString(record[posIdx]) + } - output.WriteByte(clByte) + if keepID == true { + output.WriteByte(tabByte) + output.WriteString(record[idIdx]) + } + + if keepInfo == true { + // Write the index of the allele, to allow users to segregate data in the INFO field + output.WriteByte(tabByte) + output.WriteString(strconv.Itoa(altIndices[i])) + + // Write info for all indices + output.WriteByte(tabByte) + output.WriteString(record[infoIdx]) + } + + output.WriteByte(clByte) + } } } @@ -657,7 +688,7 @@ func processLines(header []string, numChars int, config *Config, queue chan [][] } } - if output.Len() > 0 { + if !config.noOut && output.Len() > 0 { fileMutex.Lock() writer.Write(output.Bytes()) @@ -665,9 +696,11 @@ func processLines(header []string, numChars int, config *Config, queue chan [][] fileMutex.Unlock() } - err = arrowBuilder.Release() - if err != nil { - log.Fatal(err) + if arrowBuilder != nil { + err = arrowBuilder.Release() + if err != nil { + log.Fatal(err) + } } complete <- true @@ -992,7 +1025,7 @@ func getAlleles(chrom string, pos string, ref string, alt string) (string, []str // makeHetHomozygotes process all sample genotype fields, and for a single alleleNum, which is the allele index (1 based) // returns the homozygotes, heterozygotes, missing samples, total alt counts, genotype counts, and missing counts -func makeHetHomozygotes(fields []string, header []string, alleleNum string) ([]string, []string, []string, []any, int, int) { +func makeHetHomozygotes(fields []string, header []string, alleleNum string, needsLabels bool, needsDosages bool) ([]string, []string, []string, []any, int, int) { var homs []string var hets []string var missing []string @@ -1018,7 +1051,11 @@ SAMPLES: // Reference is the most common case, e.g. 0|0, 0/0 if sampleGenotypeField[0] == '0' && sampleGenotypeField[2] == '0' { totalGtCount += 2 - dosages = append(dosages, int16(0)) + + if needsDosages { + dosages = append(dosages, uint16(0)) + } + continue SAMPLES } @@ -1029,8 +1066,14 @@ SAMPLES: if (sampleGenotypeField[0] == '0' && sampleGenotypeField[2] == alleleNum[0]) || (sampleGenotypeField[0] == alleleNum[0] && sampleGenotypeField[2] == '0') { totalGtCount += 2 totalAltCount += 1 - hets = append(hets, header[i]) - dosages = append(dosages, int16(1)) + + if needsLabels { + hets = append(hets, header[i]) + } + + if needsDosages { + dosages = append(dosages, uint16(1)) + } continue SAMPLES } @@ -1039,8 +1082,14 @@ SAMPLES: if sampleGenotypeField[0] == alleleNum[0] && sampleGenotypeField[2] == alleleNum[0] { totalGtCount += 2 totalAltCount += 2 - homs = append(homs, header[i]) - dosages = append(dosages, int16(2)) + + if needsLabels { + homs = append(homs, header[i]) + } + + if needsDosages { + dosages = append(dosages, uint16(2)) + } continue SAMPLES } @@ -1048,8 +1097,13 @@ SAMPLES: // N|., .|N, .|., N/., ./N, ./. are all considered missing samples, because if one site is missing, the other is likely unreliable if sampleGenotypeField[0] == '.' || sampleGenotypeField[2] == '.' { - missing = append(missing, header[i]) - dosages = append(dosages, nil) + if needsLabels { + missing = append(missing, header[i]) + } + + if needsDosages { + dosages = append(dosages, nil) + } continue SAMPLES } @@ -1080,7 +1134,14 @@ SAMPLES: for _, allele := range alleles { if allele == "." { - missing = append(missing, header[i]) + if needsLabels { + missing = append(missing, header[i]) + } + + if needsDosages { + dosages = append(dosages, nil) + } + continue SAMPLES } @@ -1094,16 +1155,20 @@ SAMPLES: totalGtCount += gtCount totalAltCount += altCount - dosages = append(dosages, int16(altCount)) + if needsDosages { + dosages = append(dosages, uint16(altCount)) + } if altCount == 0 { continue } - if int(altCount) == gtCount { - homs = append(homs, header[i]) - } else { - hets = append(hets, header[i]) + if needsLabels { + if int(altCount) == gtCount { + homs = append(homs, header[i]) + } else { + hets = append(hets, header[i]) + } } } diff --git a/main_test.go b/main_test.go index 6b56c5d..04b3ece 100644 --- a/main_test.go +++ b/main_test.go @@ -6,6 +6,8 @@ import ( "fmt" "log" "os" + "path" + "path/filepath" "strconv" "strings" "testing" @@ -167,7 +169,9 @@ func TestHeader(t *testing.T) { } func TestWriteSampleListIfWanted(t *testing.T) { - config := Config{sampleListPath: "./test.sample_list"} + filePath := path.Join(t.TempDir(), "./test.sample_list") + + config := Config{sampleListPath: filePath} versionLine := "##fileformat=VCFv4.x" header := strings.Join([]string{"#CHROM", "POS", "ID", "REF", "ALT", "QUAL", @@ -187,7 +191,7 @@ func TestWriteSampleListIfWanted(t *testing.T) { readVcf(&config, reader, w) - inFh, err := os.Open("./test.sample_list") + inFh, err := os.Open(filePath) if err != nil { log.Fatal(err) } @@ -224,7 +228,8 @@ func TestWriteSampleListIfWanted(t *testing.T) { } func TestWriteSampleListWhenNoSamples(t *testing.T) { - config := Config{sampleListPath: "./test.sample_list_2"} + filePath := path.Join(t.TempDir(), "./test.sample_list_2") + config := Config{sampleListPath: filePath} versionLine := "##fileformat=VCFv4.x" header := strings.Join([]string{"#CHROM", "POS", "ID", "REF", "ALT", "QUAL", @@ -244,7 +249,7 @@ func TestWriteSampleListWhenNoSamples(t *testing.T) { readVcf(&config, reader, w) - inFh, err := os.Open("./test.sample_list_2") + inFh, err := os.Open(filePath) if err != nil { log.Fatal(err) } @@ -655,7 +660,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields := append(sharedFieldsGT, "0|0", "0|0", "0|0", "0|0") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an := makeHetHomozygotes(fields, header, "1") + actualHoms, actualHets, missing, _, ac, an := makeHetHomozygotes(fields, header, "1", true, true) sampleMaf := float64(ac) / float64(an) @@ -674,7 +679,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGT, "0|1", "0|1", "0|1", "0|1") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1", true, true) sampleMaf = float64(ac) / float64(an) @@ -694,7 +699,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGT, ".|.", ".|.", ".|1", "1|.") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1", true, true) sampleMaf = 0 @@ -713,7 +718,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGT, ".|1", "0|1", "0|1", "0|1", strconv.FormatFloat(sampleMaf, 'G', 3, 64)) // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1", true, true) sampleMaf = float64(ac) / float64(an) @@ -734,7 +739,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGT, "1|.", "0|1", "0|1", "0|1", strconv.FormatFloat(sampleMaf, 'G', 3, 64)) // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1", true, true) sampleMaf = float64(ac) / float64(an) @@ -753,7 +758,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGT, "1|1", "1|1", "0|1", "0|1", strconv.FormatFloat(sampleMaf, 'G', 3, 64)) // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1", true, true) sampleMaf = float64(ac) / float64(an) @@ -772,7 +777,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGT, "1|2", "1|1", "0|1", "0|1") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1", true, true) sampleMaf = float64(ac) / float64(an) @@ -794,7 +799,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGT, "1|2", "1|1", "0|1", "0|1") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2", true, true) sampleMaf = float64(ac) / float64(an) if len(actualHoms) == 0 && len(actualHets) == 1 { @@ -812,7 +817,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGTcomplex, "1|2:-0.03,-1.12,-5.00", "1|1:-0.03,-1.12,-5.00", "0|1:-0.03,-1.12,-5.00", "0|1:-0.03,-1.12,-5.00") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2", true, true) sampleMaf = float64(ac) / float64(an) if len(actualHoms) == 0 && len(actualHets) == 1 { @@ -830,7 +835,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGTcomplex, "1|2|1:-0.03,-1.12,-5.00", "1|1:-0.03,-1.12,-5.00", "0|1:-0.03,-1.12,-5.00", "0|1:-0.03,-1.12,-5.00") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2", true, true) sampleMaf = float64(ac) / float64(an) if len(actualHoms) == 0 && len(actualHets) == 1 { @@ -848,7 +853,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGT, "1|2|1", "1|1", "0|1", "0|1") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2", true, true) sampleMaf = float64(ac) / float64(an) if len(actualHoms) == 0 && len(actualHets) == 1 { @@ -866,7 +871,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGTcomplex, "2|2|2:-0.03,-1.12,-5.00", "1|1:-0.03,-1.12,-5.00", "0|1:-0.03,-1.12,-5.00", "0|1:-0.03,-1.12,-5.00") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2", true, true) sampleMaf = float64(ac) / float64(an) if len(actualHoms) == 1 && len(actualHets) == 0 { @@ -884,7 +889,7 @@ func TestMakeHetHomozygotes(t *testing.T) { fields = append(sharedFieldsGT, "2|2|2", "1|1", "0|1", "0|1") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "2", true, true) sampleMaf = float64(ac) / float64(an) if len(actualHoms) == 1 && len(actualHets) == 0 { @@ -911,7 +916,7 @@ func TestMakeHetHomozygotesHaploid(t *testing.T) { fields := append(sharedFieldsGT, "0", ".", "1", "0") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an := makeHetHomozygotes(fields, header, "1") + actualHoms, actualHets, missing, _, ac, an := makeHetHomozygotes(fields, header, "1", true, true) sampleMaf := float64(ac) / float64(an) if len(actualHoms) == 1 && len(actualHets) == 0 && len(missing) == 1 { @@ -929,7 +934,7 @@ func TestMakeHetHomozygotesHaploid(t *testing.T) { fields = append(sharedFieldsGTcomplex, "0:1", ".:1", "1:1", "0:1") // The allele index we want to test is always 1...unless it's a multiallelic site - actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1") + actualHoms, actualHets, missing, _, ac, an = makeHetHomozygotes(fields, header, "1", true, true) sampleMaf = float64(ac) / float64(an) if len(actualHoms) == 1 && len(actualHets) == 0 && len(missing) == 1 { @@ -2904,6 +2909,8 @@ func TestManyAlleles(t *testing.T) { // test that we can write a genotype matrix func TestGenotypeMatrix(t *testing.T) { + filePath := filepath.Join(t.TempDir(), "./test_genotype_matrix.feather") + versionLine := "##fileformat=VCFv4.x" header := strings.Join([]string{"#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT", "S1", "S2", "S3"}, "\t") row1 := strings.Join([]string{"1", "1000", "rs1", "A", "T", ".", "PASS", "DP=100", "GT", "1|1", "0|1", "0|0"}, "\t") @@ -2916,7 +2923,7 @@ func TestGenotypeMatrix(t *testing.T) { reader := bufio.NewReader(strings.NewReader(lines)) config := Config{emptyField: "!", fieldDelimiter: ";", allowedFilters: allowedFilters, - dosageMatrixOutPath: "./test_genotype_matrix.feather"} + dosageMatrixOutPath: filePath} byteBuf := new(bytes.Buffer) w := bufio.NewWriter(byteBuf) @@ -2924,7 +2931,7 @@ func TestGenotypeMatrix(t *testing.T) { readVcf(&config, reader, w) // Read dosage matrix - file, err := os.Open("./test_genotype_matrix.feather") + file, err := os.Open(filePath) if err != nil { t.Fatal(err) } @@ -2947,17 +2954,17 @@ func TestGenotypeMatrix(t *testing.T) { for rowIdx := 0; rowIdx < int(record.NumRows()); rowIdx++ { switch record.Column(0).(*array.String).Value(rowIdx) { case "chr1:1000:A:T": - genotypeS1 := record.Column(1).(*array.Int16).Value(rowIdx) - genotypeS2 := record.Column(2).(*array.Int16).Value(rowIdx) - genotypeS3 := record.Column(3).(*array.Int16).Value(rowIdx) + genotypeS1 := record.Column(1).(*array.Uint16).Value(rowIdx) + genotypeS2 := record.Column(2).(*array.Uint16).Value(rowIdx) + genotypeS3 := record.Column(3).(*array.Uint16).Value(rowIdx) if genotypeS1 != 2 || genotypeS2 != 1 || genotypeS3 != 0 { t.Error("NOT OK: Expected 2,1,0, got", genotypeS1, genotypeS2, genotypeS3) } case "chr2:200:C:G": - genotypeS1 := record.Column(1).(*array.Int16).Value(rowIdx) - genotypeS2 := record.Column(2).(*array.Int16).Value(rowIdx) - genotypeS3 := record.Column(3).(*array.Int16).Value(rowIdx) + genotypeS1 := record.Column(1).(*array.Uint16).Value(rowIdx) + genotypeS2 := record.Column(2).(*array.Uint16).Value(rowIdx) + genotypeS3 := record.Column(3).(*array.Uint16).Value(rowIdx) if genotypeS1 != 1 || genotypeS2 != 0 || genotypeS3 != 2 { t.Error("NOT OK: Expected 1,0,2, got", genotypeS1, genotypeS2, genotypeS3) @@ -2971,7 +2978,7 @@ func TestGenotypeMatrix(t *testing.T) { t.Error("NOT OK: Expected null value for S2") } - genotypeS3 := record.Column(3).(*array.Int16).Value(rowIdx) + genotypeS3 := record.Column(3).(*array.Uint16).Value(rowIdx) if genotypeS3 != 2 { t.Error("NOT OK: Expected dosage of 2 for S3 got ", genotypeS3) @@ -2980,8 +2987,57 @@ func TestGenotypeMatrix(t *testing.T) { t.Fatal("Unknown row") } } + } +} + +// test that we can write a genotype matrix +func TestNoOut(t *testing.T) { + filePath := filepath.Join(t.TempDir(), "./test_genotype_matrix.feather") + + versionLine := "##fileformat=VCFv4.x" + header := strings.Join([]string{"#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT", "S1", "S2", "S3"}, "\t") + row1 := strings.Join([]string{"1", "1000", "rs1", "A", "T", ".", "PASS", "DP=100", "GT", "1|1", "0|1", "0|0"}, "\t") + + allowedFilters := map[string]bool{"PASS": true, ".": true} + + lines := versionLine + "\n" + header + "\n" + row1 + "\n" + reader := bufio.NewReader(strings.NewReader(lines)) + + config := Config{emptyField: "!", fieldDelimiter: ";", allowedFilters: allowedFilters, + dosageMatrixOutPath: filePath, noOut: true} + + byteBuf := new(bytes.Buffer) + w := bufio.NewWriter(byteBuf) + + results := bufio.NewScanner(byteBuf) + + readVcf(&config, reader, w) + w.Flush() + + index := -1 + for results.Scan() { + index++ + } + + if index != -1 { + t.Error("NOT OK: Expected no output") + } + + // Read dosage matrix + file, err := os.Open(filePath) + if err != nil { + t.Fatal(err) + } + defer file.Close() + + // Create a new IPC reader + arrowReader, err := ipc.NewFileReader(file) + if err != nil { + t.Fatal(err) + } + defer arrowReader.Close() - //Clean up arrow file - os.Remove("./test_genotype_matrix.feather") + if arrowReader.NumRecords() == 0 { + t.Error("NOT OK: Expected to write dosage matrix") } }