From e1b8eb0f36682b2d783f28106d66c8e909eb5a11 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Mon, 2 Dec 2024 09:21:11 +0100 Subject: [PATCH] feat(payments/transferinitiations): store reference + list by reference (#1740) Co-authored-by: Crimson --- .../internal/storage/transfer_initiation.go | 15 ++++- .../storage/transfer_initiation_test.go | 2 + .../api/service/transfer_initiation.go | 1 + .../api/service/transfer_initiation_test.go | 3 + .../internal/storage/transfer_initiation.go | 4 +- .../storage/transfer_initiation_test.go | 3 + .../internal/models/transfer_initiation.go | 1 + .../internal/storage/migrations_v1.x.go | 56 +++++++++++++++++++ 8 files changed, 81 insertions(+), 4 deletions(-) diff --git a/components/payments/cmd/api/internal/storage/transfer_initiation.go b/components/payments/cmd/api/internal/storage/transfer_initiation.go index 06f258f1d5..a4f12ff724 100644 --- a/components/payments/cmd/api/internal/storage/transfer_initiation.go +++ b/components/payments/cmd/api/internal/storage/transfer_initiation.go @@ -14,7 +14,7 @@ func (s *Storage) GetTransferInitiation(ctx context.Context, id models.TransferI var transferInitiation models.TransferInitiation query := s.db.NewSelect(). - Column("id", "connector_id", "created_at", "scheduled_at", "description", "type", "source_account_id", "destination_account_id", "provider", "initial_amount", "amount", "asset", "metadata"). + Column("id", "reference", "connector_id", "created_at", "scheduled_at", "description", "type", "source_account_id", "destination_account_id", "provider", "initial_amount", "amount", "asset", "metadata"). Model(&transferInitiation). Relation("RelatedAdjustments"). Where("id = ?", id) @@ -68,7 +68,7 @@ func (s *Storage) ListTransferInitiations(ctx context.Context, q ListTransferIni (*bunpaginate.OffsetPaginatedQuery[PaginatedQueryOptions[TransferInitiationQuery]])(&q), func(query *bun.SelectQuery) *bun.SelectQuery { query = query. - Column("id", "connector_id", "created_at", "scheduled_at", "description", "type", "source_account_id", "destination_account_id", "provider", "initial_amount", "amount", "asset", "metadata"). + Column("id", "reference", "connector_id", "created_at", "scheduled_at", "description", "type", "source_account_id", "destination_account_id", "provider", "initial_amount", "amount", "asset", "metadata"). Relation("RelatedAdjustments") if q.Options.QueryBuilder != nil { @@ -105,6 +105,17 @@ func (s *Storage) transferInitiationQueryContext(qb query.Builder) (string, []an default: return "", nil, fmt.Errorf("unexpected type %T for column '%s'", accountID, key) } + case key == "reference": + if operator != "$match" { + return "", nil, fmt.Errorf("'%s' column can only be used with $match", key) + } + + switch reference := value.(type) { + case string: + return fmt.Sprintf("%s = ?", key), []any{reference}, nil + default: + return "", nil, fmt.Errorf("unexpected type %T for column %q", reference, key) + } default: return "", nil, fmt.Errorf("unknown key '%s' when building query", key) } diff --git a/components/payments/cmd/api/internal/storage/transfer_initiation_test.go b/components/payments/cmd/api/internal/storage/transfer_initiation_test.go index 0c614f0a18..a9ce2a2874 100644 --- a/components/payments/cmd/api/internal/storage/transfer_initiation_test.go +++ b/components/payments/cmd/api/internal/storage/transfer_initiation_test.go @@ -17,6 +17,7 @@ func insertTransferInitiation(t *testing.T, store *Storage, connectorID models.C Reference: "tf_1", ConnectorID: connectorID, }, + Reference: "tf_1", CreatedAt: time.Date(2023, 11, 14, 8, 0, 0, 0, time.UTC), ScheduledAt: time.Date(2023, 11, 14, 8, 0, 0, 0, time.UTC), Description: "test_1", @@ -48,6 +49,7 @@ func insertTransferInitiation(t *testing.T, store *Storage, connectorID models.C Reference: "tf_2", ConnectorID: connectorID, }, + Reference: "tf_2", CreatedAt: time.Date(2023, 11, 14, 9, 0, 0, 0, time.UTC), ScheduledAt: time.Date(2023, 11, 14, 9, 0, 0, 0, time.UTC), Description: "test_2", diff --git a/components/payments/cmd/connectors/internal/api/service/transfer_initiation.go b/components/payments/cmd/connectors/internal/api/service/transfer_initiation.go index e72df5f5f6..86e3e32840 100644 --- a/components/payments/cmd/connectors/internal/api/service/transfer_initiation.go +++ b/components/payments/cmd/connectors/internal/api/service/transfer_initiation.go @@ -140,6 +140,7 @@ func (s *Service) CreateTransferInitiation(ctx context.Context, req *CreateTrans createdAt := time.Now().UTC() tf := &models.TransferInitiation{ ID: id, + Reference: req.Reference, CreatedAt: createdAt, ScheduledAt: req.ScheduledAt, Description: req.Description, diff --git a/components/payments/cmd/connectors/internal/api/service/transfer_initiation_test.go b/components/payments/cmd/connectors/internal/api/service/transfer_initiation_test.go index 943547540d..9ae944978d 100644 --- a/components/payments/cmd/connectors/internal/api/service/transfer_initiation_test.go +++ b/components/payments/cmd/connectors/internal/api/service/transfer_initiation_test.go @@ -59,6 +59,7 @@ func TestCreateTransferInitiation(t *testing.T) { Reference: "ref1", ConnectorID: connectorDummyPay.ID, }, + Reference: "ref1", ScheduledAt: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), Description: "test", DestinationAccountID: destinationAccountID, @@ -99,6 +100,7 @@ func TestCreateTransferInitiation(t *testing.T) { Reference: "ref1", ConnectorID: connectorDummyPay.ID, }, + Reference: "ref1", ScheduledAt: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), DestinationAccountID: destinationAccountID, SourceAccountID: &sourceAccountID, @@ -139,6 +141,7 @@ func TestCreateTransferInitiation(t *testing.T) { Reference: "ref1", ConnectorID: connectorDummyPay.ID, }, + Reference: "ref1", ScheduledAt: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), Description: "test", DestinationAccountID: destinationAccountID, diff --git a/components/payments/cmd/connectors/internal/storage/transfer_initiation.go b/components/payments/cmd/connectors/internal/storage/transfer_initiation.go index cbaa4dd319..48c102efb6 100644 --- a/components/payments/cmd/connectors/internal/storage/transfer_initiation.go +++ b/components/payments/cmd/connectors/internal/storage/transfer_initiation.go @@ -19,7 +19,7 @@ func (s *Storage) CreateTransferInitiation(ctx context.Context, transferInitiati }() query := tx.NewInsert(). - Column("id", "created_at", "scheduled_at", "description", "type", "destination_account_id", "provider", "connector_id", "initial_amount", "amount", "asset", "metadata"). + Column("id", "reference", "created_at", "scheduled_at", "description", "type", "destination_account_id", "provider", "connector_id", "initial_amount", "amount", "asset", "metadata"). Model(transferInitiation) if transferInitiation.SourceAccountID != nil { @@ -53,7 +53,7 @@ func (s *Storage) ReadTransferInitiation(ctx context.Context, id models.Transfer var transferInitiation models.TransferInitiation query := s.db.NewSelect(). - Column("id", "created_at", "scheduled_at", "description", "type", "source_account_id", "destination_account_id", "provider", "connector_id", "amount", "asset", "metadata"). + Column("id", "reference", "created_at", "scheduled_at", "description", "type", "source_account_id", "destination_account_id", "provider", "connector_id", "amount", "asset", "metadata"). Model(&transferInitiation). Relation("RelatedAdjustments"). Where("id = ?", id) diff --git a/components/payments/cmd/connectors/internal/storage/transfer_initiation_test.go b/components/payments/cmd/connectors/internal/storage/transfer_initiation_test.go index 1dc127c48e..111e077c91 100644 --- a/components/payments/cmd/connectors/internal/storage/transfer_initiation_test.go +++ b/components/payments/cmd/connectors/internal/storage/transfer_initiation_test.go @@ -49,6 +49,7 @@ func testCreateTransferInitiations(t *testing.T, store *storage.Storage) { } t1 = &models.TransferInitiation{ ID: t1ID, + Reference: "test1", CreatedAt: t1T, ScheduledAt: t1T, Description: "test_description", @@ -73,6 +74,7 @@ func testCreateTransferInitiations(t *testing.T, store *storage.Storage) { } t2 = &models.TransferInitiation{ ID: t2ID, + Reference: "test2", CreatedAt: t2T, ScheduledAt: t2T, Description: "test_description2", @@ -136,6 +138,7 @@ func testGetTransferInitiation( func checkTransferInitiationsEqual(t *testing.T, t1, t2 *models.TransferInitiation, checkRelatedAdjusment bool) { require.Equal(t, t1.ID, t2.ID) + require.Equal(t, t1.Reference, t2.Reference) require.Equal(t, t1.CreatedAt.UTC(), t2.CreatedAt.UTC()) require.Equal(t, t1.ScheduledAt.UTC(), t2.ScheduledAt.UTC()) require.Equal(t, t1.Description, t2.Description) diff --git a/components/payments/internal/models/transfer_initiation.go b/components/payments/internal/models/transfer_initiation.go index 71860aba04..84cc61b4f1 100644 --- a/components/payments/internal/models/transfer_initiation.go +++ b/components/payments/internal/models/transfer_initiation.go @@ -115,6 +115,7 @@ type TransferInitiation struct { // Filled when created in DB ID TransferInitiationID `bun:",pk,nullzero"` + Reference string CreatedAt time.Time `bun:",nullzero"` ScheduledAt time.Time `bun:",nullzero"` Description string diff --git a/components/payments/internal/storage/migrations_v1.x.go b/components/payments/internal/storage/migrations_v1.x.go index 720586adec..94d5f0d259 100644 --- a/components/payments/internal/storage/migrations_v1.x.go +++ b/components/payments/internal/storage/migrations_v1.x.go @@ -391,6 +391,11 @@ func registerMigrationsV1(ctx context.Context, migrator *migrations.Migrator) { return fixExpandingChangelogs(ctx, tx) }, }, + migrations.Migration{ + Up: func(tx bun.Tx) error { + return fixMissingReferenceTransferInitiation(ctx, tx) + }, + }, ) } @@ -1090,3 +1095,54 @@ func fixExpandingChangelogs(ctx context.Context, tx bun.Tx) error { return nil } + +func fixMissingReferenceTransferInitiation(ctx context.Context, tx bun.Tx) error { + _, err := tx.Exec(` + ALTER TABLE transfers.transfer_initiation ADD COLUMN IF NOT EXISTS reference text; + `) + if err != nil { + return err + } + + var createdAt time.Time + for { + var transferInitiations []models.TransferInitiation + query := tx.NewSelect(). + Model(&transferInitiations). + Order("created_at ASC"). + Limit(100) + + if !createdAt.IsZero() { + query.Where("created_at > ?", createdAt) + } + + err := query.Scan(ctx) + if err != nil { + return err + } + + fmt.Println("Processing", len(transferInitiations), "transfer initiations") + if len(transferInitiations) == 0 { + break + } + + for i := range transferInitiations { + if transferInitiations[i].Reference == "" { + transferInitiations[i].Reference = transferInitiations[i].ID.Reference + } + createdAt = transferInitiations[i].CreatedAt + } + + fmt.Println("Updating", len(transferInitiations), "transfer initiations") + _, err = tx.NewInsert(). + Model(&transferInitiations). + On("CONFLICT (id) DO UPDATE"). + Set("reference = EXCLUDED.reference"). + Exec(ctx) + if err != nil { + return err + } + } + + return nil +}