Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: Enable storing _cq_client_id. #2046

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/simple_plugin/plugin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func getTables() schema.Tables {
}
for _, t := range tables {
schema.AddCqIDs(t)
schema.AddCqClientID(t)
}
return tables
}
3 changes: 3 additions & 0 deletions scheduler/queue/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
atomic.AddUint64(&tableMetrics.Errors, 1)
return
}
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
Expand Down
3 changes: 3 additions & 0 deletions scheduler/scheduler_dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
atomic.AddUint64(&tableMetrics.Errors, 1)
return
}
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
Expand Down
7 changes: 7 additions & 0 deletions schema/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ var CqParentIDColumn = Column{
IgnoreInTests: true,
}

var CqClientIDColumn = Column{
Name: "_cq_client_id",
Type: arrow.BinaryTypes.String,
Description: "Internal CQ ID of the multiplexed client",
NotNull: true,
}

// These columns are managed and populated by the destination plugin.
var CqSyncTimeColumn = Column{
Name: "_cq_sync_time",
Expand Down
8 changes: 8 additions & 0 deletions schema/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ func (r *Resource) storeCQID(value uuid.UUID) error {
return r.Set(CqIDColumn.Name, b)
}

func (r *Resource) StoreCQClientID(clientID string) error {
// We skip if _cq_client_id is not present.
if r.Table.Columns.Get(CqClientIDColumn.Name) == nil {
return nil
}
return r.Set(CqClientIDColumn.Name, clientID)
}

type PKError struct {
MissingPKs []string
}
Expand Down
11 changes: 11 additions & 0 deletions schema/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ func AddCqIDs(table *Table) {
}
}

// AddCqClientID adds the cq_client_id column to the table,
// which is used to identify the multiplexed client that fetched the resource
func AddCqClientID(t *Table) {
if t.Columns.Get(CqClientIDColumn.Name) == nil {
t.Columns = append(ColumnList{CqClientIDColumn}, t.Columns...)
}
for _, rel := range t.Relations {
AddCqClientID(rel)
}
}

// CqIDAsPK sets the cq_id column as primary key if it exists
// and removes the primary key from all other columns
func CqIDAsPK(t *Table) {
Expand Down
Loading