Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(storage): de-duplicate payments when updating db #286

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/connectors/engine/activities/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func (a Activities) temporalPluginPollingError(ctx context.Context, err error, p
}

func (a Activities) temporalPluginErrorCheck(ctx context.Context, err error, isPeriodic bool) error {

switch {
// Do not retry the following errors
case errors.Is(err, plugins.ErrNotImplemented):
Expand Down
36 changes: 27 additions & 9 deletions internal/storage/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@
func (s *store) PaymentsUpsert(ctx context.Context, payments []models.Payment) error {
paymentsToInsert := make([]payment, 0, len(payments))
adjustmentsToInsert := make([]paymentAdjustment, 0)
paymentsRefundedSeen := make(map[models.PaymentID]int)
paymentsRefunded := make([]payment, 0)
paymentsInitialAmountToAdjustSeen := make(map[models.PaymentID]int)
paymentsInitialAmountToAdjust := make([]payment, 0)
paymentsCapturedSeen := make(map[models.PaymentID]int)
paymentsCaptured := make([]payment, 0)
for _, p := range payments {
paymentsToInsert = append(paymentsToInsert, fromPaymentModels(p))
Expand All @@ -76,17 +79,32 @@
adjustmentsToInsert = append(adjustmentsToInsert, fromPaymentAdjustmentModels(a))
switch a.Status {
case models.PAYMENT_STATUS_AMOUNT_ADJUSTEMENT:
res := fromPaymentModels(p)
res.InitialAmount = a.Amount
paymentsInitialAmountToAdjust = append(paymentsInitialAmountToAdjust, res)
if i, ok := paymentsInitialAmountToAdjustSeen[p.ID]; ok {
paymentsInitialAmountToAdjust[i].InitialAmount = a.Amount

Check warning on line 83 in internal/storage/payments.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/payments.go#L83

Added line #L83 was not covered by tests
} else {
res := fromPaymentModels(p)
res.InitialAmount = a.Amount
paymentsInitialAmountToAdjust = append(paymentsInitialAmountToAdjust, res)
paymentsInitialAmountToAdjustSeen[p.ID] = len(paymentsInitialAmountToAdjust) - 1
}
case models.PAYMENT_STATUS_REFUNDED:
res := fromPaymentModels(p)
res.Amount = a.Amount
paymentsRefunded = append(paymentsRefunded, res)
if i, ok := paymentsRefundedSeen[p.ID]; ok {
paymentsRefunded[i].Amount.Add(paymentsRefunded[i].Amount, a.Amount)
} else {
res := fromPaymentModels(p)
res.Amount = a.Amount
paymentsRefunded = append(paymentsRefunded, res)
paymentsRefundedSeen[p.ID] = len(paymentsRefunded) - 1
}
case models.PAYMENT_STATUS_CAPTURE, models.PAYMENT_STATUS_REFUND_REVERSED:
res := fromPaymentModels(p)
res.Amount = a.Amount
paymentsCaptured = append(paymentsCaptured, res)
if i, ok := paymentsCapturedSeen[p.ID]; ok {
paymentsCaptured[i].Amount.Add(paymentsCaptured[i].Amount, a.Amount)

Check warning on line 101 in internal/storage/payments.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/payments.go#L101

Added line #L101 was not covered by tests
} else {
res := fromPaymentModels(p)
res.Amount = a.Amount
paymentsCaptured = append(paymentsCaptured, res)
paymentsCapturedSeen[p.ID] = len(paymentsCaptured) - 1
}
}
}
}
Expand Down
118 changes: 118 additions & 0 deletions internal/storage/payments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,108 @@ var (
}
)

func defaultPaymentsRefunded() []models.Payment {
defaultAccounts := defaultAccounts()
return []models.Payment{
{
ID: pID1,
ConnectorID: defaultConnector.ID,
Reference: "test1",
CreatedAt: now.Add(-60 * time.Minute).UTC().Time,
Type: models.PAYMENT_TYPE_TRANSFER,
InitialAmount: big.NewInt(100),
Amount: big.NewInt(100),
Asset: "USD/2",
Scheme: models.PAYMENT_SCHEME_OTHER,
SourceAccountID: &defaultAccounts[0].ID,
DestinationAccountID: &defaultAccounts[1].ID,
Metadata: map[string]string{
"key1": "value1",
},
Adjustments: []models.PaymentAdjustment{
{
ID: models.PaymentAdjustmentID{
PaymentID: pID1,
Reference: "test1",
CreatedAt: now.Add(-60 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_SUCCEEDED,
},
Reference: "test1",
CreatedAt: now.Add(-60 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_SUCCEEDED,
Amount: big.NewInt(100),
Asset: pointer.For("USD/2"),
Raw: []byte(`{}`),
},
},
},
{
ID: pID1,
ConnectorID: defaultConnector.ID,
Reference: "test1",
CreatedAt: now.Add(-59 * time.Minute).UTC().Time,
Type: models.PAYMENT_TYPE_TRANSFER,
InitialAmount: big.NewInt(100),
Amount: big.NewInt(100),
Asset: "USD/2",
Scheme: models.PAYMENT_SCHEME_OTHER,
SourceAccountID: &defaultAccounts[0].ID,
DestinationAccountID: &defaultAccounts[1].ID,
Metadata: map[string]string{
"key1": "value1",
},
Adjustments: []models.PaymentAdjustment{
{
ID: models.PaymentAdjustmentID{
PaymentID: pID1,
Reference: "test1",
CreatedAt: now.Add(-59 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_REFUNDED,
},
Reference: "test1",
CreatedAt: now.Add(-59 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_REFUNDED,
Amount: big.NewInt(10),
Asset: pointer.For("USD/2"),
Raw: []byte(`{}`),
},
},
},
{
ID: pID1,
ConnectorID: defaultConnector.ID,
Reference: "test1",
CreatedAt: now.Add(-58 * time.Minute).UTC().Time,
Type: models.PAYMENT_TYPE_TRANSFER,
InitialAmount: big.NewInt(100),
Amount: big.NewInt(100),
Asset: "USD/2",
Scheme: models.PAYMENT_SCHEME_OTHER,
SourceAccountID: &defaultAccounts[0].ID,
DestinationAccountID: &defaultAccounts[1].ID,
Metadata: map[string]string{
"key1": "value1",
},
Adjustments: []models.PaymentAdjustment{
{
ID: models.PaymentAdjustmentID{
PaymentID: pID1,
Reference: "test1",
CreatedAt: now.Add(-58 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_REFUNDED,
},
Reference: "test1",
CreatedAt: now.Add(-58 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_REFUNDED,
Amount: big.NewInt(10),
Asset: pointer.For("USD/2"),
Raw: []byte(`{}`),
},
},
},
}
}

func defaultPayments() []models.Payment {
defaultAccounts := defaultAccounts()
return []models.Payment{
Expand Down Expand Up @@ -472,6 +574,22 @@ func TestPaymentsUpsert(t *testing.T) {
})
}

func TestPaymentsUpsertRefunded(t *testing.T) {
t.Parallel()

ctx := logging.TestingContext()
store := newStore(t)

upsertConnector(t, ctx, store, defaultConnector)
upsertAccounts(t, ctx, store, defaultAccounts())
upsertPayments(t, ctx, store, defaultPaymentsRefunded())

actual, err := store.PaymentsGet(ctx, pID1)
require.NoError(t, err)
// two refunds in the same batch, should be 100 - 10 - 10 = 80
require.Equal(t, big.NewInt(80), actual.Amount)
}

func TestPaymentsUpdateMetadata(t *testing.T) {
t.Parallel()

Expand Down
Loading