Skip to content

Commit 0ce494b

Browse files
holimanfjl
andauthored
eth/fetcher: don't spend too much time on transaction inclusion (ethereum#25524)
* eth/fetcher: introduce some lag in tx fetching * eth/fetcher: change conditions a bit * eth/fetcher: use per-batch quota check * eth/fetcher: fix some comments * eth/fetcher: address review concerns * eth/fetcher: fix panic + add warn log * eth/fetcher: fix log * eth/fetcher: fix log * cmd/devp2p/internal/ethtest: fix ignorign tx announcements from prev. tests * cmd/devp2p/internal/ethtest: fix TestLargeTxRequest This increases the number of tx relay messages the test waits for. Since go-ethereum now processes incoming txs in smaller batches, the announcement messages it sends are also smaller. Co-authored-by: Felix Lange <fjl@twurst.com>
1 parent ac7ad81 commit 0ce494b

File tree

4 files changed

+72
-42
lines changed

4 files changed

+72
-42
lines changed

cmd/devp2p/internal/ethtest/helpers.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,13 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error {
357357
return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash)
358358
}
359359
return nil
360+
361+
// ignore tx announcements from previous tests
360362
case *NewPooledTransactionHashes:
361-
// ignore tx announcements from previous tests
362363
continue
364+
case *Transactions:
365+
continue
366+
363367
default:
364368
return fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
365369
}

cmd/devp2p/internal/ethtest/suite.go

+4
Original file line numberDiff line numberDiff line change
@@ -544,9 +544,13 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) {
544544
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsPacket))
545545
}
546546
return
547+
547548
// ignore propagated txs from previous tests
548549
case *NewPooledTransactionHashes:
549550
continue
551+
case *Transactions:
552+
continue
553+
550554
// ignore block announcements from previous tests
551555
case *NewBlockHashes:
552556
continue

cmd/devp2p/internal/ethtest/transaction.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/ethereum/go-ethereum/params"
3030
)
3131

32-
//var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
32+
// var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
3333
var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
3434

3535
func (s *Suite) sendSuccessfulTxs(t *utesting.T) error {
@@ -192,10 +192,10 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction
192192
nonce = txs[len(txs)-1].Nonce()
193193

194194
// Wait for the transaction announcement(s) and make sure all sent txs are being propagated.
195-
// all txs should be announced within 3 announcements.
195+
// all txs should be announced within a couple announcements.
196196
recvHashes := make([]common.Hash, 0)
197197

198-
for i := 0; i < 3; i++ {
198+
for i := 0; i < 20; i++ {
199199
switch msg := recvConn.readAndServe(s.chain, timeout).(type) {
200200
case *Transactions:
201201
for _, tx := range *msg {

eth/fetcher/tx_fetcher.go

+60-38
Original file line numberDiff line numberDiff line change
@@ -262,57 +262,79 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
262262
// direct request replies. The differentiation is important so the fetcher can
263263
// re-schedule missing transactions as soon as possible.
264264
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
265-
// Keep track of all the propagated transactions
266-
if direct {
267-
txReplyInMeter.Mark(int64(len(txs)))
268-
} else {
269-
txBroadcastInMeter.Mark(int64(len(txs)))
265+
var (
266+
inMeter = txReplyInMeter
267+
knownMeter = txReplyKnownMeter
268+
underpricedMeter = txReplyUnderpricedMeter
269+
otherRejectMeter = txReplyOtherRejectMeter
270+
)
271+
if !direct {
272+
inMeter = txBroadcastInMeter
273+
knownMeter = txBroadcastKnownMeter
274+
underpricedMeter = txBroadcastUnderpricedMeter
275+
otherRejectMeter = txBroadcastOtherRejectMeter
270276
}
277+
// Keep track of all the propagated transactions
278+
inMeter.Mark(int64(len(txs)))
279+
271280
// Push all the transactions into the pool, tracking underpriced ones to avoid
272281
// re-requesting them and dropping the peer in case of malicious transfers.
273282
var (
274-
added = make([]common.Hash, 0, len(txs))
275-
duplicate int64
276-
underpriced int64
277-
otherreject int64
283+
added = make([]common.Hash, 0, len(txs))
284+
delay time.Duration
278285
)
279-
errs := f.addTxs(txs)
280-
for i, err := range errs {
281-
// Track the transaction hash if the price is too low for us.
282-
// Avoid re-request this transaction when we receive another
283-
// announcement.
284-
if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) {
285-
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
286-
f.underpriced.Pop()
287-
}
288-
f.underpriced.Add(txs[i].Hash())
286+
// proceed in batches
287+
for i := 0; i < len(txs); i += 128 {
288+
end := i + 128
289+
if end > len(txs) {
290+
end = len(txs)
289291
}
290-
// Track a few interesting failure types
291-
switch {
292-
case err == nil: // Noop, but need to handle to not count these
292+
var (
293+
duplicate int64
294+
underpriced int64
295+
otherreject int64
296+
)
297+
batch := txs[i:end]
298+
for j, err := range f.addTxs(batch) {
299+
// Track the transaction hash if the price is too low for us.
300+
// Avoid re-request this transaction when we receive another
301+
// announcement.
302+
if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) {
303+
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
304+
f.underpriced.Pop()
305+
}
306+
f.underpriced.Add(batch[j].Hash())
307+
}
308+
// Track a few interesting failure types
309+
switch {
310+
case err == nil: // Noop, but need to handle to not count these
293311

294-
case errors.Is(err, core.ErrAlreadyKnown):
295-
duplicate++
312+
case errors.Is(err, core.ErrAlreadyKnown):
313+
duplicate++
296314

297-
case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced):
298-
underpriced++
315+
case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced):
316+
underpriced++
299317

300-
default:
301-
otherreject++
318+
default:
319+
otherreject++
320+
}
321+
added = append(added, batch[j].Hash())
322+
}
323+
knownMeter.Mark(duplicate)
324+
underpricedMeter.Mark(underpriced)
325+
otherRejectMeter.Mark(otherreject)
326+
327+
// If 'other reject' is >25% of the deliveries in any batch, abort. Either we are
328+
// out of sync with the chain or the peer is griefing us.
329+
if otherreject > 128/4 {
330+
delay = 200 * time.Millisecond
331+
log.Warn("Peer delivering useless transactions", "peer", peer, "ignored", len(txs)-end)
332+
break
302333
}
303-
added = append(added, txs[i].Hash())
304-
}
305-
if direct {
306-
txReplyKnownMeter.Mark(duplicate)
307-
txReplyUnderpricedMeter.Mark(underpriced)
308-
txReplyOtherRejectMeter.Mark(otherreject)
309-
} else {
310-
txBroadcastKnownMeter.Mark(duplicate)
311-
txBroadcastUnderpricedMeter.Mark(underpriced)
312-
txBroadcastOtherRejectMeter.Mark(otherreject)
313334
}
314335
select {
315336
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
337+
time.Sleep(delay)
316338
return nil
317339
case <-f.quit:
318340
return errTerminated

0 commit comments

Comments
 (0)