Skip to content

Commit

Permalink
Use protobuf reflection for serialization
Browse files Browse the repository at this point in the history
This is used instead of regular Go relection because it is harder
to map it to the table schema. The protobuf is the canonical
representation making converison easier.
  • Loading branch information
ficoos committed Oct 30, 2023
1 parent c683a8c commit e311d5e
Show file tree
Hide file tree
Showing 6 changed files with 1,538 additions and 1,626 deletions.
11 changes: 6 additions & 5 deletions tools/mc2bq/pkg/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/reflect/protoreflect"

"github.com/GoogleCloudPlatform/migrationcenter-utils/tools/mc2bq/pkg/gapiutil"
"github.com/GoogleCloudPlatform/migrationcenter-utils/tools/mc2bq/pkg/mcutil"
Expand Down Expand Up @@ -187,7 +188,7 @@ type iterable[T any] interface {
Next() (T, error)
}

type objectReader[T any] struct {
type objectReader[T protoreflect.ProtoMessage] struct {
schema bigquery.Schema
it iterable[T]
serializer func(obj T) ([]byte, error)
Expand All @@ -197,9 +198,9 @@ type objectReader[T any] struct {
bytesRead uint64
}

func newObjectReader[T any](it iterable[*T], root string, schema bigquery.Schema) *objectReader[*T] {
return &objectReader[*T]{
serializer: exporterschema.NewSerializer[*T](root, schema),
func newObjectReader[T protoreflect.ProtoMessage](it iterable[T], root string, schema bigquery.Schema) *objectReader[T] {
return &objectReader[T]{
serializer: exporterschema.NewSerializer[T](root, schema),
it: it,
schema: schema,
}
Expand Down Expand Up @@ -247,7 +248,7 @@ func (r *objectReader[T]) Read(buf []byte) (int, error) {
return n, nil
}

func newMigrationCenterLoadSource[T any](r *objectReader[T]) bigquery.LoadSource {
func newMigrationCenterLoadSource[T protoreflect.ProtoMessage](r *objectReader[T]) bigquery.LoadSource {
// Creating a full blown bigquery.LoadSource requires a lot of low level big query operations.
// To save on time we create a ReaderSource and feed it the assets as a json stream.
src := bigquery.NewReaderSource(r)
Expand Down
6 changes: 3 additions & 3 deletions tools/mc2bq/pkg/export/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (mc *MCv1) AssetSource(ctx context.Context, pal mcutil.ProjectAndLocation)
Parent: pal.String(),
PageSize: 1000,
})
r := newObjectReader[migrationcenterpb.Asset](it, "asset", mc.schema.AssetTable)
r := newObjectReader[*migrationcenterpb.Asset](it, "asset", mc.schema.AssetTable)
src := newMigrationCenterLoadSource(r)
return &struct {
bigquery.LoadSource
Expand All @@ -49,7 +49,7 @@ func (mc *MCv1) GroupSource(ctx context.Context, pal mcutil.ProjectAndLocation)
Parent: pal.String(),
PageSize: 1000,
})
r := newObjectReader[migrationcenterpb.Group](it, "asset", mc.schema.GroupTable)
r := newObjectReader[*migrationcenterpb.Group](it, "group", mc.schema.GroupTable)
src := newMigrationCenterLoadSource(r)
return &struct {
bigquery.LoadSource
Expand All @@ -62,7 +62,7 @@ func (mc *MCv1) PreferenceSetSource(ctx context.Context, pal mcutil.ProjectAndLo
Parent: pal.String(),
PageSize: 1000,
})
r := newObjectReader[migrationcenterpb.PreferenceSet](it, "asset", mc.schema.PreferenceSetTable)
r := newObjectReader[*migrationcenterpb.PreferenceSet](it, "preference_set", mc.schema.PreferenceSetTable)
src := newMigrationCenterLoadSource(r)
return &struct {
bigquery.LoadSource
Expand Down
Loading

0 comments on commit e311d5e

Please # to comment.