diff --git a/loopdb/sqlc/batch.sql.go b/loopdb/sqlc/batch.sql.go index 9c8aedc1c..02b231ca8 100644 --- a/loopdb/sqlc/batch.sql.go +++ b/loopdb/sqlc/batch.sql.go @@ -10,20 +10,6 @@ import ( "database/sql" ) -const confirmBatch = `-- name: ConfirmBatch :exec -UPDATE - sweep_batches -SET - confirmed = TRUE -WHERE - id = $1 -` - -func (q *Queries) ConfirmBatch(ctx context.Context, id int32) error { - _, err := q.db.ExecContext(ctx, confirmBatch, id) - return err -} - const dropBatch = `-- name: DropBatch :exec DELETE FROM sweep_batches WHERE id = $1 ` diff --git a/loopdb/sqlc/querier.go b/loopdb/sqlc/querier.go index 9b600727e..b243b54e9 100644 --- a/loopdb/sqlc/querier.go +++ b/loopdb/sqlc/querier.go @@ -12,7 +12,6 @@ import ( type Querier interface { AllDeposits(ctx context.Context) ([]Deposit, error) AllStaticAddresses(ctx context.Context) ([]StaticAddress, error) - ConfirmBatch(ctx context.Context, id int32) error CreateDeposit(ctx context.Context, arg CreateDepositParams) error CreateReservation(ctx context.Context, arg CreateReservationParams) error CreateStaticAddress(ctx context.Context, arg CreateStaticAddressParams) error diff --git a/loopdb/sqlc/queries/batch.sql b/loopdb/sqlc/queries/batch.sql index b02241273..095992684 100644 --- a/loopdb/sqlc/queries/batch.sql +++ b/loopdb/sqlc/queries/batch.sql @@ -35,14 +35,6 @@ UPDATE sweep_batches SET last_rbf_sat_per_kw = $6 WHERE id = $1; --- name: ConfirmBatch :exec -UPDATE - sweep_batches -SET - confirmed = TRUE -WHERE - id = $1; - -- name: UpsertSweep :exec INSERT INTO sweeps ( swap_hash, diff --git a/loopout.go b/loopout.go index fde6d2826..9cce79d82 100644 --- a/loopout.go +++ b/loopout.go @@ -1148,7 +1148,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context, quitChan := make(chan bool, 1) defer func() { - quitChan <- true + close(quitChan) }() notifier := sweepbatcher.SpendNotifier{ diff --git a/loopout_feerate.go b/loopout_feerate.go index 4cebd1e32..4540644c1 100644 --- a/loopout_feerate.go +++ b/loopout_feerate.go @@ -7,6 +7,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/utils" @@ -71,7 +72,8 @@ func newLoopOutSweepFeerateProvider(sweeper sweeper, // GetMinFeeRate returns minimum required feerate for a sweep by swap hash. func (p *loopOutSweepFeerateProvider) GetMinFeeRate(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + swapHash lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { _, feeRate, err := p.GetConfTargetAndFeeRate(ctx, swapHash) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index 0d52bcbfe..9db0d56ca 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -3,6 +3,7 @@ package sweepbatcher import ( "bytes" "context" + "encoding/hex" "fmt" "github.com/btcsuite/btcd/blockchain" @@ -36,13 +37,7 @@ func (b *batch) ensurePresigned(ctx context.Context, newSweeps []*sweep, // presignedTxChecker has methods to check if the inputs are presigned. type presignedTxChecker interface { destPkScripter - - // SignTx signs an unsigned transaction or returns a pre-signed tx. - // It is only called with loadOnly=true by ensurePresigned. - SignTx(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount, - minRelayFee, feeRate chainfee.SatPerKWeight, - loadOnly bool) (*wire.MsgTx, error) + presigner } // ensurePresigned checks that there is a presigned transaction spending the @@ -251,7 +246,7 @@ func (b *batch) presign(ctx context.Context, newSweeps []*sweep) error { // Cache the destination address. destAddr, err := getPresignedSweepsDestAddr( - ctx, b.cfg.presignedHelper, b.primarySweepID, + ctx, b.cfg.presignedHelper, primarySweepID, b.cfg.chainParams, ) if err != nil { @@ -287,11 +282,12 @@ func (b *batch) presign(ctx context.Context, newSweeps []*sweep) error { // presigner tries to presign a batch transaction. type presigner interface { - // Presign tries to presign a batch transaction. If the method returns - // nil, it is guaranteed that future calls to SignTx on this set of - // sweeps return valid signed transactions. - Presign(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount) error + // SignTx signs an unsigned transaction or returns a pre-signed tx. + // It is only called with loadOnly=true by ensurePresigned. + SignTx(ctx context.Context, primarySweepID wire.OutPoint, + tx *wire.MsgTx, inputAmt btcutil.Amount, + minRelayFee, feeRate chainfee.SatPerKWeight, + loadOnly bool) (*wire.MsgTx, error) } // presign tries to presign batch sweep transactions of the sweeps. It signs @@ -370,7 +366,14 @@ func presign(ctx context.Context, presigner presigner, destAddr btcutil.Address, } // Try to presign this transaction. - err = presigner.Presign(ctx, primarySweepID, tx, batchAmt) + const ( + loadOnly = false + minRelayFee = chainfee.AbsoluteFeePerKwFloor + ) + _, err = presigner.SignTx( + ctx, primarySweepID, tx, batchAmt, minRelayFee, fr, + loadOnly, + ) if err != nil { return fmt.Errorf("failed to presign unsigned tx %v "+ "for feeRate %v: %w", tx.TxHash(), fr, err) @@ -405,9 +408,16 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, } } + // Determine the current minimum relay fee based on our chain backend. + minRelayFee, err := b.wallet.MinRelayFee(ctx) + if err != nil { + return 0, fmt.Errorf("failed to get minRelayFee: %w", err), + false + } + // Cache current height and desired feerate of the batch. currentHeight := b.currentHeight - feeRate := b.rbfCache.FeeRate + feeRate := max(b.rbfCache.FeeRate, minRelayFee) // Append this sweep to an array of sweeps. This is needed to keep the // order of sweeps stored, as iterating the sweeps map does not @@ -445,13 +455,6 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, batchAmt += sweep.value } - // Determine the current minimum relay fee based on our chain backend. - minRelayFee, err := b.wallet.MinRelayFee(ctx) - if err != nil { - return 0, fmt.Errorf("failed to get minRelayFee: %w", err), - false - } - // Get a pre-signed transaction. const loadOnly = false signedTx, err := b.cfg.presignedHelper.SignTx( @@ -506,6 +509,9 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, b.batchTxid = &txHash b.batchPkScript = tx.TxOut[0].PkScript + // Update cached FeeRate not to broadcast a tx with lower feeRate. + b.rbfCache.FeeRate = max(b.rbfCache.FeeRate, signedFeeRate) + return fee, nil, true } @@ -597,8 +603,9 @@ func CheckSignedTx(unsignedTx, signedTx *wire.MsgTx, inputAmt btcutil.Amount, unsignedOut := unsignedTx.TxOut[0] signedOut := signedTx.TxOut[0] if !bytes.Equal(unsignedOut.PkScript, signedOut.PkScript) { - return fmt.Errorf("mismatch of output pkScript: %v, %v", - unsignedOut.PkScript, signedOut.PkScript) + return fmt.Errorf("mismatch of output pkScript: %s, %s", + hex.EncodeToString(unsignedOut.PkScript), + hex.EncodeToString(signedOut.PkScript)) } // Find the feerate of signedTx. diff --git a/sweepbatcher/presigned_test.go b/sweepbatcher/presigned_test.go index 60f287764..9f0d8b29f 100644 --- a/sweepbatcher/presigned_test.go +++ b/sweepbatcher/presigned_test.go @@ -553,24 +553,30 @@ type mockPresigner struct { failAt int } -// Presign memorizes the value of the output and fails if the number of +// SignTx memorizes the value of the output and fails if the number of // calls previously made is failAt. -func (p *mockPresigner) Presign(ctx context.Context, - primarySweepID wire.OutPoint, tx *wire.MsgTx, - inputAmt btcutil.Amount) error { +func (p *mockPresigner) SignTx(ctx context.Context, + primarySweepID wire.OutPoint, tx *wire.MsgTx, inputAmt btcutil.Amount, + minRelayFee, feeRate chainfee.SatPerKWeight, + loadOnly bool) (*wire.MsgTx, error) { + + if ctx.Err() != nil { + return nil, ctx.Err() + } if !hasInput(tx, primarySweepID) { - return fmt.Errorf("primarySweepID %v not in tx", primarySweepID) + return nil, fmt.Errorf("primarySweepID %v not in tx", + primarySweepID) } if len(p.outputs)+1 == p.failAt { - return fmt.Errorf("test error in Presign") + return nil, fmt.Errorf("test error in SignTx") } p.outputs = append(p.outputs, btcutil.Amount(tx.TxOut[0].Value)) p.lockTimes = append(p.lockTimes, tx.LockTime) - return nil + return tx, nil } // TestPresign checks that function presign presigns correct set of transactions diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index 1b87cde8b..3eb59f8d5 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -17,9 +17,6 @@ import ( // Querier is the interface that contains all the queries generated // by sqlc for sweep batcher. type Querier interface { - // ConfirmBatch confirms a batch by setting the state to confirmed. - ConfirmBatch(ctx context.Context, id int32) error - // GetBatchSweeps fetches all the sweeps that are part a batch. GetBatchSweeps(ctx context.Context, batchID int32) ( []sqlc.Sweep, error) @@ -136,11 +133,6 @@ func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error { return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch)) } -// ConfirmBatch confirms a batch by setting the state to confirmed. -func (s *SQLStore) ConfirmBatch(ctx context.Context, id int32) error { - return s.baseDb.ConfirmBatch(ctx, id) -} - // FetchBatchSweeps fetches all the sweeps that are part a batch. func (s *SQLStore) FetchBatchSweeps(ctx context.Context, id int32) ( []*dbSweep, error) { @@ -248,7 +240,7 @@ type dbSweep struct { // Amount is the amount of the sweep. Amount btcutil.Amount - // Completed indicates whether this sweep is completed. + // Completed indicates whether this sweep is fully-confirmed. Completed bool } diff --git a/sweepbatcher/store_mock.go b/sweepbatcher/store_mock.go index d5a3ffbc1..0533949b1 100644 --- a/sweepbatcher/store_mock.go +++ b/sweepbatcher/store_mock.go @@ -81,22 +81,6 @@ func (s *StoreMock) UpdateSweepBatch(ctx context.Context, return nil } -// ConfirmBatch confirms a batch. -func (s *StoreMock) ConfirmBatch(ctx context.Context, id int32) error { - s.mu.Lock() - defer s.mu.Unlock() - - batch, ok := s.batches[id] - if !ok { - return errors.New("batch not found") - } - - batch.Confirmed = true - s.batches[batch.ID] = batch - - return nil -} - // FetchBatchSweeps fetches all the sweeps that belong to a batch. func (s *StoreMock) FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error) { diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 3084a26aa..33d63f4b1 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -481,7 +481,7 @@ func (b *batch) Errorf(format string, params ...interface{}) { // checkSweepToAdd checks if a sweep can be added or updated in the batch. The // caller must lock the event loop using scheduleNextCall. The function returns // if the sweep already exists in the batch. If presigned mode is enabled, the -// result depends on the outcome of the method presignedHelper.Presign for a +// result depends on the outcome of the method presignedHelper.SignTx for a // non-empty batch. For an empty batch, the input needs to pass // PresignSweepsGroup. func (b *batch) checkSweepToAdd(_ context.Context, sweep *sweep) (bool, error) { @@ -1281,6 +1281,10 @@ func constructUnsignedTx(sweeps []sweep, address btcutil.Address, return nil, 0, 0, 0, fmt.Errorf("txscript.PayToAddrScript "+ "failed: %w", err) } + if len(batchPkScript) == 0 { + return nil, 0, 0, 0, fmt.Errorf("txscript.PayToAddrScript " + + "returned an empty pkScript") + } // Add the output to weight estimates. err = sweeppkg.AddOutputEstimate(&weightEstimate, address) @@ -1923,12 +1927,12 @@ func getFeePortionForSweep(spendTx *wire.MsgTx, numSweeps int, } // getFeePortionPaidBySweep returns the fee portion that the sweep should pay -// for the batch transaction. If the sweep is the first sweep in the batch, it +// for the batch transaction. If the sweep is the primary sweep in the batch, it // pays the rounding difference. -func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep, - roundingDiff btcutil.Amount, sweep *sweep) btcutil.Amount { +func getFeePortionPaidBySweep(feePortionPerSweep, roundingDiff btcutil.Amount, + primary bool) btcutil.Amount { - if bytes.Equal(spendTx.TxIn[0].SignatureScript, sweep.htlc.SigScript) { + if primary { return feePortionPerSweep + roundingDiff } @@ -1939,7 +1943,6 @@ func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep, func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { var ( txHash = spendTx.TxHash() - purgeList = make([]SweepRequest, 0, len(b.sweeps)) notifyList = make([]sweep, 0, len(b.sweeps)) ) b.batchTxid = &txHash @@ -1949,7 +1952,126 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { b.Warnf("transaction %v has no outputs", txHash) } - // Determine if we should use presigned mode for the batch. + // Make a set of confirmed sweeps. + confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn)) + for _, txIn := range spendTx.TxIn { + confirmedSet[txIn.PreviousOutPoint] = struct{}{} + } + + // As a previous version of the batch transaction may get confirmed, + // which does not contain the latest sweeps, we need to detect the + // sweeps that did not make it to the confirmed transaction and feed + // them back to the batcher. This will ensure that the sweeps will enter + // a new batch instead of remaining dangling. + var ( + totalSweptAmt btcutil.Amount + confirmedSweeps = []wire.OutPoint{} + ) + for _, sweep := range b.sweeps { + // Skip sweeps that were not included into the confirmed tx. + _, found := confirmedSet[sweep.outpoint] + if !found { + continue + } + + totalSweptAmt += sweep.value + notifyList = append(notifyList, sweep) + confirmedSweeps = append(confirmedSweeps, sweep.outpoint) + } + + // Calculate the fee portion that each sweep should pay for the batch. + feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( + spendTx, len(notifyList), totalSweptAmt, + ) + + // Calculate fees per swaps. Only the first sweep in a swap has a + // notifier, so we calculate total fee per swap and send it to a sweep + // having that swap and a notifier. + swap2fee := make(map[lntypes.Hash]btcutil.Amount) + for _, sweep := range notifyList { + primary := sweep.outpoint == b.primarySweepID + + swap2fee[sweep.swapHash] += getFeePortionPaidBySweep( + feePortionPaidPerSweep, roundingDifference, primary, + ) + } + + // Now send notifications to notifiers. + for _, sweep := range notifyList { + // If the sweep's notifier is empty then this means that a swap + // is not waiting to read an update from it or this is not the + // first sweep in a swap, so we can skip the notification part. + if sweep.notifier == nil || + *sweep.notifier == (SpendNotifier{}) { + + continue + } + + // Make sure there is only one sweep with a notifier per swap + // hash, otherwise our fee calculation is incorrect. + fee, has := swap2fee[sweep.swapHash] + if !has { + return fmt.Errorf("no fee for swap %v; maybe "+ + "multiple sweeps with a notifier per swap?", + sweep.swapHash) + } + delete(swap2fee, sweep.swapHash) + + spendDetail := SpendDetail{ + Tx: spendTx, + OnChainFeePortion: fee, + } + + // Dispatch the sweep notifier, we don't care about the outcome + // of this action so we don't wait for it. + go func() { + // Make sure this context doesn't expire so we + // successfully notify the caller. + ctx := context.WithoutCancel(ctx) + + sweep.notifySweepSpend(ctx, &spendDetail) + }() + } + + b.Infof("spent, confirmed sweeps: %v", confirmedSweeps) + + // We are no longer able to accept new sweeps, so we mark the batch as + // closed and persist on storage. + b.state = Closed + + if err := b.persist(ctx); err != nil { + return fmt.Errorf("saving batch failed: %w", err) + } + + if err := b.monitorConfirmations(ctx); err != nil { + return fmt.Errorf("monitorConfirmations failed: %w", err) + } + + return nil +} + +// handleConf handles a confirmation notification. This is the final step of the +// batch. Here we signal to the batcher that this batch was completed. +func (b *batch) handleConf(ctx context.Context, + conf *chainntnfs.TxConfirmation) error { + + spendTx := conf.Tx + txHash := spendTx.TxHash() + if b.batchTxid == nil || *b.batchTxid != txHash { + b.Warnf("Mismatch of batch txid: tx in spend notification had "+ + "txid %v, but confirmation notification has txif %v. "+ + "Using the later.", b.batchTxid, txHash) + } + b.batchTxid = &txHash + + b.Infof("confirmed in txid %s", b.batchTxid) + b.state = Confirmed + + if err := b.persist(ctx); err != nil { + return fmt.Errorf("saving batch failed: %w", err) + } + + // If the batch is in presigned mode, cleanup presignedHelper. presigned, err := b.isPresigned() if err != nil { return fmt.Errorf("failed to determine if the batch %d uses "+ @@ -1967,40 +2089,46 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { b.id, err) } + // Make a set of confirmed sweeps. + confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn)) + for _, txIn := range spendTx.TxIn { + confirmedSet[txIn.PreviousOutPoint] = struct{}{} + } + // As a previous version of the batch transaction may get confirmed, // which does not contain the latest sweeps, we need to detect the // sweeps that did not make it to the confirmed transaction and feed // them back to the batcher. This will ensure that the sweeps will enter // a new batch instead of remaining dangling. var ( - totalSweptAmt btcutil.Amount confirmedSweeps = []wire.OutPoint{} - purgedSweeps = []wire.OutPoint{} - purgedSwaps = []lntypes.Hash{} + purgeList = make([]SweepRequest, 0, len(b.sweeps)) + totalSweptAmt btcutil.Amount ) for _, sweep := range allSweeps { - found := false - - for _, txIn := range spendTx.TxIn { - if txIn.PreviousOutPoint == sweep.outpoint { - found = true - totalSweptAmt += sweep.value - notifyList = append(notifyList, sweep) - confirmedSweeps = append( - confirmedSweeps, sweep.outpoint, - ) - - break + _, found := confirmedSet[sweep.outpoint] + if found { + // Save the sweep as completed. Note that sweeps are + // marked completed after the batch is marked confirmed + // because the check in handleSweeps checks sweep's + // status first and then checks the batch status. + err := b.persistSweep(ctx, sweep, true) + if err != nil { + return err } + + confirmedSweeps = append( + confirmedSweeps, sweep.outpoint, + ) + + totalSweptAmt += sweep.value + + continue } // If the sweep's outpoint was not found in the transaction's // inputs this means it was left out. So we delete it from this // batch and feed it back to the batcher. - if found { - continue - } - newSweep := sweep delete(b.sweeps, sweep.outpoint) @@ -2032,6 +2160,10 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { }) } } + var ( + purgedSweeps = []wire.OutPoint{} + purgedSwaps = []lntypes.Hash{} + ) for _, sweepReq := range purgeList { purgedSwaps = append(purgedSwaps, sweepReq.SwapHash) for _, input := range sweepReq.Inputs { @@ -2039,45 +2171,8 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { } } - // Calculate the fee portion that each sweep should pay for the batch. - feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( - spendTx, len(notifyList), totalSweptAmt, - ) - - for _, sweep := range notifyList { - // Save the sweep as completed. - err := b.persistSweep(ctx, sweep, true) - if err != nil { - return err - } - - // If the sweep's notifier is empty then this means that a swap - // is not waiting to read an update from it, so we can skip - // the notification part. - if sweep.notifier == nil || - *sweep.notifier == (SpendNotifier{}) { - - continue - } - - spendDetail := SpendDetail{ - Tx: spendTx, - OnChainFeePortion: getFeePortionPaidBySweep( - spendTx, feePortionPaidPerSweep, - roundingDifference, &sweep, - ), - } - - // Dispatch the sweep notifier, we don't care about the outcome - // of this action so we don't wait for it. - go func() { - // Make sure this context doesn't expire so we - // successfully notify the caller. - ctx := context.WithoutCancel(ctx) - - sweep.notifySweepSpend(ctx, &spendDetail) - }() - } + b.Infof("fully confirmed sweeps: %v, purged sweeps: %v, "+ + "purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps) // Proceed with purging the sweeps. This will feed the sweeps that // didn't make it to the confirmed batch transaction back to the batcher @@ -2099,49 +2194,6 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { } }() - b.Infof("spent, confirmed sweeps: %v, purged sweeps: %v, "+ - "purged swaps: %v, purged groups: %v", confirmedSweeps, - purgedSweeps, purgedSwaps, len(purgeList)) - - // We are no longer able to accept new sweeps, so we mark the batch as - // closed and persist on storage. - b.state = Closed - - if err = b.persist(ctx); err != nil { - return fmt.Errorf("saving batch failed: %w", err) - } - - if err = b.monitorConfirmations(ctx); err != nil { - return fmt.Errorf("monitorConfirmations failed: %w", err) - } - - return nil -} - -// handleConf handles a confirmation notification. This is the final step of the -// batch. Here we signal to the batcher that this batch was completed. We also -// cleanup up presigned transactions whose primarySweepID is one of the sweeps -// that were spent and fully confirmed: such a transaction can't be broadcasted -// since it is either in a block or double-spends one of spent outputs. -func (b *batch) handleConf(ctx context.Context, - conf *chainntnfs.TxConfirmation) error { - - spendTx := conf.Tx - txHash := spendTx.TxHash() - if b.batchTxid == nil || *b.batchTxid != txHash { - b.Warnf("Mismatch of batch txid: tx in spend notification had "+ - "txid %v, but confirmation notification has txif %v. "+ - "Using the later.", b.batchTxid, txHash) - } - b.batchTxid = &txHash - - // If the batch is in presigned mode, cleanup presignedHelper. - presigned, err := b.isPresigned() - if err != nil { - return fmt.Errorf("failed to determine if the batch %d uses "+ - "presigned mode: %w", b.id, err) - } - if presigned { b.Infof("Cleaning up presigned store") @@ -2157,23 +2209,23 @@ func (b *batch) handleConf(ctx context.Context, } } - b.Infof("confirmed in txid %s", b.batchTxid) - b.state = Confirmed - - if err := b.store.ConfirmBatch(ctx, b.id); err != nil { - return fmt.Errorf("failed to store confirmed state: %w", err) - } - // Calculate the fee portion that each sweep should pay for the batch. - // TODO: make sure spendTx matches b.sweeps. - var totalSweptAmt btcutil.Amount - for _, s := range b.sweeps { - totalSweptAmt += s.value - } feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( spendTx, len(b.sweeps), totalSweptAmt, ) + // Calculate fees per swaps. Only the first sweep in a swap has a + // notifier, so we calculate total fee per swap and send it to a sweep + // having that swap and a notifier. + swap2fee := make(map[lntypes.Hash]btcutil.Amount) + for _, sweep := range b.sweeps { + primary := sweep.outpoint == b.primarySweepID + + swap2fee[sweep.swapHash] += getFeePortionPaidBySweep( + feePortionPaidPerSweep, roundingDifference, primary, + ) + } + // Send the confirmation to all the notifiers. for _, s := range b.sweeps { // If the sweep's notifier is empty then this means that @@ -2183,12 +2235,19 @@ func (b *batch) handleConf(ctx context.Context, continue } + // Make sure there is only one sweep with a notifier per swap + // hash, otherwise our fee calculation is incorrect. + fee, has := swap2fee[s.swapHash] + if !has { + return fmt.Errorf("no fee for swap %v; maybe "+ + "multiple sweeps with a notifier per swap?", + s.swapHash) + } + delete(swap2fee, s.swapHash) + confDetail := &ConfDetail{ - TxConfirmation: conf, - OnChainFeePortion: getFeePortionPaidBySweep( - spendTx, feePortionPaidPerSweep, - roundingDifference, &s, - ), + TxConfirmation: conf, + OnChainFeePortion: fee, } // Notify the caller in a goroutine to avoid possible dead-lock. diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index b89ba5f6b..18bd41f35 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -2,6 +2,7 @@ package sweepbatcher import ( "context" + "encoding/hex" "errors" "fmt" "strings" @@ -12,6 +13,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/btcsuite/btcwallet/chain" @@ -57,9 +59,6 @@ type BatcherStore interface { // UpdateSweepBatch updates a batch in the database. UpdateSweepBatch(ctx context.Context, batch *dbBatch) error - // ConfirmBatch confirms a batch by setting its state to confirmed. - ConfirmBatch(ctx context.Context, id int32) error - // FetchBatchSweeps fetches all the sweeps that belong to a batch. FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error) @@ -158,18 +157,11 @@ type SignMuSig2 func(ctx context.Context, muSig2Version input.MuSig2Version, // fails (e.g. because one of the inputs is offline), an input can't be added to // a batch. type PresignedHelper interface { - // Presign tries to presign a batch transaction. If the method returns - // nil, it is guaranteed that future calls to SignTx on this set of - // sweeps return valid signed transactions. The implementation should - // first check if this transaction already exists in the store to skip - // cosigning if possible. - Presign(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount) error - // DestPkScript returns destination pkScript used by the sweep batch // with the primary outpoint specified. Returns an error, if such tx // doesn't exist. If there are many such transactions, returns any of // pkScript's; all of them should have the same destination pkScript. + // TODO: embed this data into SweepInfo. DestPkScript(ctx context.Context, primarySweepID wire.OutPoint) ([]byte, error) @@ -201,8 +193,8 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error // FeeRateProvider is a function that returns min fee rate of a batch sweeping // the UTXO of the swap. -type FeeRateProvider func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) +type FeeRateProvider func(ctx context.Context, swapHash lntypes.Hash, + utxo wire.OutPoint) (chainfee.SatPerKWeight, error) // InitialDelayProvider returns the duration after which a newly created batch // is first published. It allows to customize the duration based on total value @@ -273,7 +265,7 @@ type addSweepsRequest struct { notifier *SpendNotifier // completed is set if the sweep is spent and the spending transaction - // is confirmed. + // is fully confirmed. completed bool // parentBatch is the parent batch of this sweep. It is loaded ony if @@ -705,7 +697,14 @@ func (b *Batcher) PresignSweepsGroup(ctx context.Context, inputs []Input, if err != nil { return fmt.Errorf("failed to get nextBlockFeeRate: %w", err) } - infof("PresignSweepsGroup: nextBlockFeeRate is %v", nextBlockFeeRate) + destPkscript, err := txscript.PayToAddrScript(destAddress) + if err != nil { + return fmt.Errorf("txscript.PayToAddrScript failed: %w", err) + } + infof("PresignSweepsGroup: nextBlockFeeRate is %v, inputs: %v, "+ + "destAddress: %v, destPkscript: %v sweepTimeout: %d", + nextBlockFeeRate, inputs, destAddress, + hex.EncodeToString(destPkscript), sweepTimeout) sweeps := make([]sweep, len(inputs)) for i, input := range inputs { @@ -792,8 +791,8 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error { } infof("Batcher adding sweep group of %d sweeps with primarySweep %x, "+ - "presigned=%v, completed=%v", len(sweeps), sweep.swapHash[:6], - sweep.presigned, completed) + "presigned=%v, fully_confirmed=%v", len(sweeps), + sweep.swapHash[:6], sweep.presigned, completed) req := &addSweepsRequest{ sweeps: sweeps, @@ -853,14 +852,10 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // If the sweep has already been completed in a confirmed batch then we // can't attach its notifier to the batch as that is no longer running. // Instead we directly detect and return the spend here. - if completed && *notifier != (SpendNotifier{}) { - // The parent batch is indeed confirmed, meaning it is complete - // and we won't be able to attach this sweep to it. - if parentBatch.Confirmed { - return b.monitorSpendAndNotify( - ctx, sweep, parentBatch.ID, notifier, - ) - } + if completed && parentBatch.Confirmed { + return b.monitorSpendAndNotify( + ctx, sweeps, parentBatch.ID, notifier, + ) } sweep.notifier = notifier @@ -924,7 +919,7 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // spinUpNewBatch creates new batch, starts it and adds the sweeps to it. If // presigned mode is enabled, the result also depends on outcome of -// presignedHelper.Presign. +// presignedHelper.SignTx. func (b *Batcher) spinUpNewBatch(ctx context.Context, sweeps []*sweep) error { // Spin up a fresh batch. newBatch, err := b.spinUpBatch(ctx) @@ -1141,9 +1136,14 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, // the response back to the response channel. It is called if the batch is fully // confirmed and we just need to deliver the data back to the caller though // SpendNotifier. -func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, +func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweeps []*sweep, parentBatchID int32, notifier *SpendNotifier) error { + // If the caller has not provided a notifier, stop. + if notifier == nil || *notifier == (SpendNotifier{}) { + return nil + } + spendCtx, cancel := context.WithCancel(ctx) // Then we get the total amount that was swept by the batch. @@ -1154,6 +1154,17 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, return err } + // Find the primarySweepID. + dbSweeps, err := b.store.FetchBatchSweeps(ctx, parentBatchID) + if err != nil { + cancel() + + return err + } + primarySweepID := dbSweeps[0].Outpoint + + sweep := sweeps[0] + spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn( spendCtx, &sweep.outpoint, sweep.htlc.PkScript, sweep.initiationHeight, @@ -1174,6 +1185,7 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, select { case spend := <-spendChan: spendTx := spend.SpendingTx + // Calculate the fee portion that each sweep should pay // for the batch. feePortionPerSweep, roundingDifference := @@ -1182,17 +1194,23 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, totalSwept, ) - onChainFeePortion := getFeePortionPaidBySweep( - spendTx, feePortionPerSweep, - roundingDifference, sweep, - ) + // Sum onchain fee across all the sweeps of the swap. + var fee btcutil.Amount + for _, s := range sweeps { + isFirst := s.outpoint == primarySweepID + + fee += getFeePortionPaidBySweep( + feePortionPerSweep, roundingDifference, + isFirst, + ) + } // Notify the requester of the spend with the spend // details, including the fee portion for this // particular sweep. spendDetail := &SpendDetail{ Tx: spendTx, - OnChainFeePortion: onChainFeePortion, + OnChainFeePortion: fee, } select { @@ -1200,7 +1218,7 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, case notifier.SpendChan <- spendDetail: err := b.monitorConfAndNotify( ctx, sweep, notifier, spendTx, - onChainFeePortion, + fee, ) if err != nil { b.writeToErrChan( @@ -1447,11 +1465,18 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, swapHash[:6], err) } + // Make sure that PkScript of the coin is filled. Otherwise + // RegisterSpendNtfn fails. + if len(s.HTLC.PkScript) == 0 { + return nil, fmt.Errorf("sweep data for %x doesn't have "+ + "HTLC.PkScript set", swapHash[:6]) + } + // Find minimum fee rate for the sweep. Use customFeeRate if it is // provided, otherwise use wallet's EstimateFeeRate. var minFeeRate chainfee.SatPerKWeight if b.customFeeRate != nil { - minFeeRate, err = b.customFeeRate(ctx, swapHash) + minFeeRate, err = b.customFeeRate(ctx, swapHash, outpoint) if err != nil { return nil, fmt.Errorf("failed to fetch min fee rate "+ "for %x: %w", swapHash[:6], err) diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 86f626cbc..36f11f750 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -14,6 +14,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/loop/loopdb" + "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/test" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lntypes" @@ -95,42 +96,6 @@ func (h *mockPresignedHelper) getTxFeerate(tx *wire.MsgTx, return chainfee.NewSatPerKWeight(fee, weight) } -// Presign tries to presign the transaction. It succeeds if all the inputs -// are online. In case of success it adds the transaction to presignedBatches. -func (h *mockPresignedHelper) Presign(ctx context.Context, - primarySweepID wire.OutPoint, tx *wire.MsgTx, - inputAmt btcutil.Amount) error { - - h.mu.Lock() - defer h.mu.Unlock() - - // Check if such a transaction already exists. This is not only an - // optimization, but also enables re-adding multiple groups if sweeps - // are offline. - wantTxHash := tx.TxHash() - for _, candidate := range h.presignedBatches[primarySweepID] { - if candidate.TxHash() == wantTxHash { - return nil - } - } - - if !hasInput(tx, primarySweepID) { - return fmt.Errorf("primarySweepID %v not in tx", primarySweepID) - } - - if offline := h.offlineInputs(tx); len(offline) != 0 { - return fmt.Errorf("some inputs of tx are offline: %v", offline) - } - - tx = tx.Copy() - h.sign(tx) - h.presignedBatches[primarySweepID] = append( - h.presignedBatches[primarySweepID], tx, - ) - - return nil -} - // DestPkScript returns destination pkScript used in presigned tx sweeping // these inputs. func (h *mockPresignedHelper) DestPkScript(ctx context.Context, @@ -158,6 +123,16 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, h.mu.Lock() defer h.mu.Unlock() + if feeRate < minRelayFee { + return nil, fmt.Errorf("feeRate (%v) is below minRelayFee (%v)", + feeRate, minRelayFee) + } + + if !hasInput(tx, primarySweepID) { + return nil, fmt.Errorf("primarySweepID %v not in tx", + primarySweepID) + } + // If all the inputs are online and loadOnly is not set, sign this exact // transaction. if offline := h.offlineInputs(tx); len(offline) == 0 && !loadOnly { @@ -199,7 +174,8 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, } if bestTx == nil { - return nil, fmt.Errorf("no such presigned tx found") + return nil, fmt.Errorf("some outpoint is offline and no " + + "suitable presigned tx found") } return bestTx.Copy(), nil @@ -232,13 +208,18 @@ func (h *mockPresignedHelper) FetchSweep(_ context.Context, h.mu.Lock() defer h.mu.Unlock() - _, has := h.onlineOutpoints[utxo] + // Find IsPresigned. + _, isPresigned := h.onlineOutpoints[utxo] return &SweepInfo{ // Set Timeout to prevent warning messages about timeout=0. Timeout: sweepTimeout, - IsPresigned: has, + IsPresigned: isPresigned, + + HTLC: swap.Htlc{ + PkScript: []byte{10, 11, 12}, + }, }, nil } @@ -254,8 +235,8 @@ func testPresigned_forgotten_presign(t *testing.T, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return chainfee.SatPerKWeight(10_000), nil } @@ -330,8 +311,8 @@ func testPresigned_input1_offline_then_input2(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -486,6 +467,118 @@ func testPresigned_input1_offline_then_input2(t *testing.T, require.NoError(t, err) } +// testPresigned_min_relay_fee tests that online and presigned transactions +// comply with min_relay_fee. +func testPresigned_min_relay_fee(t *testing.T, + batcherStore testBatcherStore) { + + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const inputAmt = 1_000_000 + + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { + + return chainfee.FeePerKwFloor, nil + } + + presignedHelper := newMockPresignedHelper() + + batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, presignedHelper, + WithCustomFeeRate(customFeeRate), + WithPresignedHelper(presignedHelper)) + go func() { + err := batcher.Run(ctx) + checkBatcherError(t, err) + }() + + // Set high min_relay_fee. + lnd.SetMinRelayFee(400) + + // Create the first sweep. + swapHash1 := lntypes.Hash{1, 1, 1} + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + sweepReq1 := SweepRequest{ + SwapHash: swapHash1, + Inputs: []Input{{ + Value: inputAmt, + Outpoint: op1, + }}, + Notifier: &dummyNotifier, + } + + // Enable the input and presign. + presignedHelper.SetOutpointOnline(op1, true) + err := batcher.PresignSweepsGroup( + ctx, []Input{{Outpoint: op1, Value: inputAmt}}, + sweepTimeout, destAddr, + ) + require.NoError(t, err) + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Since a batch was created we check that it registered for its primary + // sweep's spend. + <-lnd.RegisterSpendChannel + + // Wait for a transactions to be published. + tx := <-lnd.TxPublishChannel + gotFeeRate := presignedHelper.getTxFeerate(tx, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(402), gotFeeRate) + + // Now decrease min_relay_fee and make sure fee rate doesn't decrease. + // The only difference of tx2 is a higher lock_time. + lnd.SetMinRelayFee(300) + require.NoError(t, lnd.NotifyHeight(601)) + tx2 := <-lnd.TxPublishChannel + require.Equal(t, tx.TxOut[0].Value, tx2.TxOut[0].Value) + gotFeeRate = presignedHelper.getTxFeerate(tx2, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(402), gotFeeRate) + require.Equal(t, uint32(601), tx2.LockTime) + + // Set a higher min_relay_fee, turn off the client and try presigned tx. + lnd.SetMinRelayFee(500) + presignedHelper.SetOutpointOnline(op1, false) + + // Check fee rate of the presigned tx broadcasted. + require.NoError(t, lnd.NotifyHeight(602)) + tx = <-lnd.TxPublishChannel + gotFeeRate = presignedHelper.getTxFeerate(tx, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + // LockTime of a presigned tx is 0. + require.Equal(t, uint32(0), tx.LockTime) + + // Now decrease min_relay_fee and make sure fee rate doesn't decrease. + // It should re-broadcast the same presigned tx. + lnd.SetMinRelayFee(450) + require.NoError(t, lnd.NotifyHeight(603)) + tx2 = <-lnd.TxPublishChannel + require.Equal(t, tx.TxHash(), tx2.TxHash()) + gotFeeRate = presignedHelper.getTxFeerate(tx2, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + // LockTime of a presigned tx is 0. + require.Equal(t, uint32(0), tx2.LockTime) + + // Even if the client is back online, fee rate doesn't decrease. + presignedHelper.SetOutpointOnline(op1, true) + require.NoError(t, lnd.NotifyHeight(604)) + tx3 := <-lnd.TxPublishChannel + require.Equal(t, tx2.TxOut[0].Value, tx3.TxOut[0].Value) + gotFeeRate = presignedHelper.getTxFeerate(tx3, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + require.Equal(t, uint32(604), tx3.LockTime) +} + // testPresigned_two_inputs_one_goes_offline tests presigned mode for the // following scenario: two online inputs are added, then one of them goes // offline, then feerate grows and a presigned transaction is used. @@ -511,8 +604,8 @@ func testPresigned_two_inputs_one_goes_offline(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -647,8 +740,8 @@ func testPresigned_first_publish_fails(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -770,8 +863,8 @@ func testPresigned_locktime(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -854,8 +947,8 @@ func testPresigned_presigned_group(t *testing.T, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return chainfee.SatPerKWeight(10_000), nil } @@ -902,7 +995,7 @@ func testPresigned_presigned_group(t *testing.T, // An attempt to presign must fail. err = batcher.PresignSweepsGroup(ctx, group1, sweepTimeout, destAddr) - require.ErrorContains(t, err, "some inputs of tx are offline") + require.ErrorContains(t, err, "some outpoint is offline") // Enable both outpoints. presignedHelper.SetOutpointOnline(op2, true) @@ -1091,8 +1184,8 @@ func testPresigned_presigned_and_regular_sweeps(t *testing.T, store testStore, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -1354,23 +1447,28 @@ func testPresigned_presigned_and_regular_sweeps(t *testing.T, store testStore, // to another online batch. In offline case they must are added to a new batch // having valid presigned transactions. func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, - store testStore, batcherStore testBatcherStore, online bool) { + batcherStore testBatcherStore, online bool) { defer test.Guard(t)() require.LessOrEqual(t, numConfirmedSwaps, numSwaps) - const sweepsPerSwap = 2 + const ( + sweepsPerSwap = 2 + feeRate = chainfee.SatPerKWeight(10_000) + swapAmount = 3_000_001 + ) + sweepAmounts := []btcutil.Amount{1_000_001, 2_000_000} lnd := test.NewMockLnd() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { - return chainfee.SatPerKWeight(10_000), nil + return feeRate, nil } presignedHelper := newMockPresignedHelper() @@ -1388,12 +1486,17 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, checkBatcherError(t, err) }() + swapHashes := make([]lntypes.Hash, numSwaps) + groups := make([][]Input, numSwaps) txs := make([]*wire.MsgTx, numSwaps) allOps := make([]wire.OutPoint, 0, numSwaps*sweepsPerSwap) + spendChans := make([]<-chan *SpendDetail, numSwaps) + confChans := make([]<-chan *ConfDetail, numSwaps) for i := range numSwaps { // Create a swap of sweepsPerSwap sweeps. swapHash := lntypes.Hash{byte(i + 1)} + swapHashes[i] = swapHash ops := make([]wire.OutPoint, sweepsPerSwap) group := make([]Input, sweepsPerSwap) for j := range sweepsPerSwap { @@ -1405,29 +1508,10 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, group[j] = Input{ Outpoint: ops[j], - Value: btcutil.Amount(1_000_000 * (j + 1)), + Value: sweepAmounts[j], } } - - // Create a swap in DB. - swap := &loopdb.LoopOutContract{ - SwapContract: loopdb.SwapContract{ - CltvExpiry: 111, - AmountRequested: 3_000_000, - ProtocolVersion: loopdb.ProtocolVersionMuSig2, - HtlcKeys: htlcKeys, - - // Make preimage unique to pass SQL constraints. - Preimage: lntypes.Preimage{byte(i + 1)}, - }, - - DestAddr: destAddr, - SwapInvoice: swapInvoice, - SweepConfTarget: 111, - } - err := store.CreateLoopOut(ctx, swapHash, swap) - require.NoError(t, err) - store.AssertLoopOutStored() + groups[i] = group // Enable all the sweeps. for _, op := range ops { @@ -1435,16 +1519,29 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, } // An attempt to presign must succeed. - err = batcher.PresignSweepsGroup( + err := batcher.PresignSweepsGroup( ctx, group, sweepTimeout, destAddr, ) require.NoError(t, err) + // Create a spending notification channel. + spendChan := make(chan *SpendDetail, 1) + spendChans[i] = spendChan + confChan := make(chan *ConfDetail, 1) + confChans[i] = confChan + notifier := &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + ConfErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + // Add the sweep, triggering the publish attempt. require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ SwapHash: swapHash, Inputs: group, - Notifier: &dummyNotifier, + Notifier: notifier, })) // For the first group it should register for the sweep's spend @@ -1482,31 +1579,11 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, }, } - // Create a swap in DB. - swap := &loopdb.LoopOutContract{ - SwapContract: loopdb.SwapContract{ - CltvExpiry: 111, - AmountRequested: amount, - ProtocolVersion: loopdb.ProtocolVersionMuSig2, - HtlcKeys: htlcKeys, - - // Make preimage unique to pass SQL constraints. - Preimage: lntypes.Preimage{1, 2, 3}, - }, - - DestAddr: destAddr, - SwapInvoice: swapInvoice, - SweepConfTarget: 111, - } - err := store.CreateLoopOut(ctx, swapHash, swap) - require.NoError(t, err) - store.AssertLoopOutStored() - // Enable the sweep. presignedHelper.SetOutpointOnline(opx, true) // An attempt to presign must succeed. - err = batcher.PresignSweepsGroup( + err := batcher.PresignSweepsGroup( ctx, group, sweepTimeout, destAddr, ) require.NoError(t, err) @@ -1543,6 +1620,34 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, SpendingHeight: int32(601 + numSwaps + 1), } lnd.SpendChannel <- spendDetail + + // Calculate the expected on-chain fee of the swap. + wantFee := make([]btcutil.Amount, numConfirmedSwaps) + for i := range numConfirmedSwaps { + batchAmount := swapAmount * btcutil.Amount(numConfirmedSwaps) + txFee := batchAmount - btcutil.Amount(tx.TxOut[0].Value) + numConfirmedSweeps := numConfirmedSwaps * sweepsPerSwap + feePerSweep := txFee / btcutil.Amount(numConfirmedSweeps) + roundingDiff := txFee - feePerSweep*btcutil.Amount( + numConfirmedSweeps, + ) + swapFee := feePerSweep * 2 + + // Add rounding difference to the first swap. + if i == 0 { + swapFee += roundingDiff + } + + wantFee[i] = swapFee + } + + // Make sure that notifiers of confirmed sweeps received notifications. + for i := range numConfirmedSwaps { + spend := <-spendChans[i] + require.Equal(t, txHash, spend.Tx.TxHash()) + require.Equal(t, wantFee[i], spend.OnChainFeePortion) + } + <-lnd.RegisterConfChannel require.NoError(t, lnd.NotifyHeight( int32(601+numSwaps+1+batchConfHeight), @@ -1554,12 +1659,19 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, // CleanupTransactions is called here. <-presignedHelper.cleanupCalled - // If all the swaps were confirmed, stop. - if numConfirmedSwaps == numSwaps { - return + // Increasing block height caused the second batch to re-publish. + if online && numConfirmedSwaps < numSwaps { + <-lnd.TxPublishChannel + } + + // Make sure that notifiers of confirmed sweeps received notifications. + for i := range numConfirmedSwaps { + conf := <-confChans[i] + require.Equal(t, txHash, conf.Tx.TxHash()) + require.Equal(t, wantFee[i], conf.OnChainFeePortion) } - if !online { + if !online && numConfirmedSwaps != numSwaps { // If the sweeps are offline, the missing sweeps in the // confirmed transaction should be re-added to the batcher as // new batch. The groups are added incrementally, so we need @@ -1568,6 +1680,49 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, <-lnd.TxPublishChannel } + // Now make sure that a correct spend and conf contification is sent if + // AddSweep is called after confirming the sweeps. + for i := range numConfirmedSwaps { + // Create a spending notification channel. + spendChan := make(chan *SpendDetail, 1) + confChan := make(chan *ConfDetail) + notifier := &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + ConfErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + + // Add the sweep, triggering the publish attempt. + require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ + SwapHash: swapHashes[i], + Inputs: groups[i], + Notifier: notifier, + })) + + spendReg := <-lnd.RegisterSpendChannel + spendReg.SpendChannel <- spendDetail + + spend := <-spendChan + require.Equal(t, txHash, spend.Tx.TxHash()) + require.Equal(t, wantFee[i], spend.OnChainFeePortion) + + <-lnd.RegisterConfChannel + lnd.ConfChannel <- &chainntnfs.TxConfirmation{ + Tx: tx, + } + + conf := <-confChan + require.Equal(t, tx.TxHash(), conf.Tx.TxHash()) + require.Equal(t, wantFee[i], conf.OnChainFeePortion) + } + + // If all the swaps were confirmed, stop. + if numConfirmedSwaps == numSwaps { + return + } + // Wait to new batch to appear and to have the expected size. wantSize := (numSwaps - numConfirmedSwaps) * sweepsPerSwap if online { @@ -1624,6 +1779,10 @@ func TestPresigned(t *testing.T) { testPresigned_input1_offline_then_input2(t, NewStoreMock()) }) + t.Run("min_relay_fee", func(t *testing.T) { + testPresigned_min_relay_fee(t, NewStoreMock()) + }) + t.Run("two_inputs_one_goes_offline", func(t *testing.T) { testPresigned_two_inputs_one_goes_offline(t, NewStoreMock()) }) @@ -1657,14 +1816,10 @@ func TestPresigned(t *testing.T) { } t.Run(name, func(t *testing.T) { - runTests(t, func(t *testing.T, store testStore, - batcherStore testBatcherStore) { - - testPresigned_purging( - t, numSwaps, numConfirmedSwaps, - store, batcherStore, online, - ) - }) + testPresigned_purging( + t, numSwaps, numConfirmedSwaps, + NewStoreMock(), online, + ) }) } @@ -1675,11 +1830,13 @@ func TestPresigned(t *testing.T) { testPurging(3, 1, false) testPurging(3, 2, false) testPurging(5, 2, false) + testPurging(5, 3, false) // Test cases in which the sweeps are online. testPurging(2, 1, true) testPurging(3, 1, true) testPurging(3, 2, true) testPurging(5, 2, true) + testPurging(5, 3, true) }) } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index fa871829f..2f14d4fd7 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -408,8 +408,8 @@ func testFeeBumping(t *testing.T, store testStore, // Disable fee bumping, if requested. var opts []BatcherOption if noFeeBumping { - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { // Always provide the same value, no bumping. return test.DefaultMockFee, nil @@ -2304,22 +2304,15 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, return b.state == Closed }, test.Timeout, eventuallyCheckFrequency) - // Since second batch was created we check that it registered for its - // primary sweep's spend. - <-lnd.RegisterSpendChannel - - // While handling the spend notification the batch should detect that - // some sweeps did not appear in the spending tx, therefore it redirects - // them back to the batcher and the batcher inserts them in a new batch. - require.Eventually(t, func() bool { - return batcher.numBatches(ctx) == 2 - }, test.Timeout, eventuallyCheckFrequency) - // We mock the confirmation notification. lnd.ConfChannel <- &chainntnfs.TxConfirmation{ Tx: spendingTx, } + // Since second batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + // Wait for tx to be published. // Here is a race condition, which is unlikely to cause a crash: if we // wait for publish tx before sending a conf notification (previous @@ -3698,8 +3691,8 @@ func testSweepFetcher(t *testing.T, store testStore, require.NoError(t, err) store.AssertLoopOutStored() - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { // Always provide the same value, no bumping. return feeRate, nil @@ -4545,8 +4538,8 @@ func testFeeRateGrows(t *testing.T, store testStore, swap2feeRate[swapHash] = rate } - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, swapHash lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { swap2feeRateMu.Lock() defer swap2feeRateMu.Unlock() diff --git a/test/chainnotifier_mock.go b/test/chainnotifier_mock.go index 96e72f23f..a4fae0e77 100644 --- a/test/chainnotifier_mock.go +++ b/test/chainnotifier_mock.go @@ -33,10 +33,11 @@ func (c *mockChainNotifier) RawClientWithMacAuth( // SpendRegistration contains registration details. type SpendRegistration struct { - Outpoint *wire.OutPoint - PkScript []byte - HeightHint int32 - ErrChan chan<- error + Outpoint *wire.OutPoint + PkScript []byte + HeightHint int32 + SpendChannel chan<- *chainntnfs.SpendDetail + ErrChan chan<- error } // ConfRegistration contains registration details. @@ -53,13 +54,15 @@ func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, outpoint *wire.OutPoint, pkScript []byte, heightHint int32) ( chan *chainntnfs.SpendDetail, chan error, error) { + spendChan0 := make(chan *chainntnfs.SpendDetail) spendErrChan := make(chan error, 1) reg := &SpendRegistration{ - HeightHint: heightHint, - Outpoint: outpoint, - PkScript: pkScript, - ErrChan: spendErrChan, + HeightHint: heightHint, + Outpoint: outpoint, + PkScript: pkScript, + SpendChannel: spendChan0, + ErrChan: spendErrChan, } c.lnd.RegisterSpendChannel <- reg @@ -78,6 +81,12 @@ func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, case <-ctx.Done(): } + case m := <-spendChan0: + select { + case spendChan <- m: + case <-ctx.Done(): + } + case err := <-spendErrChan: select { case errChan <- err: