-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbigquery.go
96 lines (78 loc) · 2.31 KB
/
bigquery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package pkg
import (
"context"
"fmt"
"log"
"time"
"cloud.google.com/go/bigquery"
)
func LoadToBigQuery(
ctx context.Context,
client *bigquery.Client,
name string,
source DataSource,
) error {
log.Printf("starting load job for %s\n", name)
start := time.Now()
job, err := createLoadJob(ctx, client, source)
if err != nil {
return fmt.Errorf("error creating load job for %s: %v", name, err)
}
if err := waitUntilComplete(ctx, job); err != nil {
return fmt.Errorf("error running load job for %s: %v", name, err)
}
elapsed := time.Since(start).Seconds()
log.Printf("completed load job for %s after %f seconds\n", name, elapsed)
return nil
}
func createLoadJob(
ctx context.Context,
client *bigquery.Client,
source DataSource,
) (*bigquery.Job, error) {
gcsRef := generateGCSReference(source)
loader := client.Dataset(source.DatasetId).
Table(source.TableId).
LoaderFrom(gcsRef)
loader.WriteDisposition = source.WriteDisposition
loader.CreateDisposition = source.CreateDisposition
return loader.Run(ctx)
}
func generateGCSReference(source DataSource) *bigquery.GCSReference {
gcsRef := bigquery.NewGCSReference(source.SourceUri)
gcsRef.SourceFormat = source.SourceFormat
gcsRef.AutoDetect = source.AutoDetect
switch source.SourceFormat {
case bigquery.Avro:
avroOptions := source.AvroOptions
gcsRef.AvroOptions = &bigquery.AvroOptions{
UseAvroLogicalTypes: avroOptions.UseAvroLogicalTypes,
}
case bigquery.CSV:
csvOptions := source.CSVOptions
gcsRef.CSVOptions = bigquery.CSVOptions{
AllowJaggedRows: csvOptions.AllowJaggedRows,
AllowQuotedNewlines: csvOptions.AllowQuotedNewlines,
Encoding: csvOptions.Encoding,
FieldDelimiter: csvOptions.FieldDelimiter,
Quote: csvOptions.Quote,
ForceZeroQuote: csvOptions.ForceZeroQuote,
SkipLeadingRows: csvOptions.SkipLeadingRows,
NullMarker: csvOptions.NullMarker,
}
case bigquery.Parquet:
parquetOptions := source.ParquetOptions
gcsRef.ParquetOptions = &bigquery.ParquetOptions{
EnumAsString: parquetOptions.EnumAsString,
EnableListInference: parquetOptions.EnableListInference,
}
}
return gcsRef
}
func waitUntilComplete(ctx context.Context, job *bigquery.Job) error {
status, err := job.Wait(ctx)
if err != nil {
return err
}
return status.Err()
}