Skip to content

Commit d8b5fe9

Browse files
committed
Turn blacklist reasons into proper types, small improvements based on PR comments
1 parent ff865d2 commit d8b5fe9

File tree

5 files changed

+159
-62
lines changed

5 files changed

+159
-62
lines changed

src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@ import monix.eval.Task
1717
import scala.annotation.tailrec
1818
import scala.concurrent.duration._
1919
import scala.util.Try
20+
import io.iohk.ethereum.blockchain.sync.CacheBasedBlacklist
2021
object FastSyncItSpecUtils {
2122

2223
class FakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig)
2324
extends CommonFakePeer(peerName, fakePeerCustomConfig) {
2425

2526
lazy val validators = new MockValidatorsAlwaysSucceed
2627

28+
val maxSize = 1000
29+
val blacklist = CacheBasedBlacklist.empty(maxSize)
30+
2731
lazy val fastSync = system.actorOf(
2832
FastSync.props(
2933
storagesInstance.storages.fastSyncStateStorage,
@@ -32,6 +36,7 @@ object FastSyncItSpecUtils {
3236
validators,
3337
peerEventBus,
3438
etcPeerManager,
39+
blacklist,
3540
testSyncConfig,
3641
system.scheduler
3742
)

src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala

Lines changed: 112 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,57 +10,151 @@ import scala.jdk.CollectionConverters._
1010
import scala.jdk.OptionConverters._
1111
import scala.jdk.DurationConverters._
1212

13-
import Blacklist.BlacklistId
13+
import Blacklist._
14+
import io.iohk.ethereum.network.PeerId
15+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType.WrongBlockHeadersType
16+
import io.iohk.ethereum.consensus.validators.std.StdBlockValidator.BlockError
17+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType
1418

1519
trait Blacklist {
1620
def isBlacklisted(id: BlacklistId): Boolean
17-
def add(id: BlacklistId, duration: FiniteDuration, reason: String): Unit
21+
def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit
1822
def remove(id: BlacklistId): Unit
1923
def keys: Set[BlacklistId]
2024
}
2125

2226
object Blacklist {
27+
import BlacklistReason._
28+
import BlacklistReasonType._
29+
2330
trait BlacklistId {
2431
def value: String
2532
}
33+
34+
sealed trait BlacklistReason {
35+
def reasonType: BlacklistReasonType
36+
def description: String
37+
}
38+
object BlacklistReason {
39+
trait BlacklistReasonType {
40+
def code: Int
41+
def name: String
42+
}
43+
object BlacklistReasonType {
44+
case object WrongBlockHeadersType extends BlacklistReasonType {
45+
val code: Int = 1
46+
val name: String = "WrongBlockHeadersType"
47+
}
48+
case object BlockHeaderValidationFailedType extends BlacklistReasonType {
49+
val code: Int = 2
50+
val name: String = "BlockHeaderValidationFailed"
51+
}
52+
case object ErrorInBlockHeadersType extends BlacklistReasonType {
53+
val code: Int = 3
54+
val name: String = "ErrorInBlockHeaders"
55+
}
56+
case object EmptyBlockBodiesType extends BlacklistReasonType {
57+
val code: Int = 4
58+
val name: String = "EmptyBlockBodies"
59+
}
60+
case object BlockBodiesNotMatchingHeadersType extends BlacklistReasonType {
61+
val code: Int = 5
62+
val name: String = "BlockBodiesNotMatchingHeaders"
63+
}
64+
case object EmptyReceiptsType extends BlacklistReasonType {
65+
val code: Int = 6
66+
val name: String = "EmptyReceipts"
67+
}
68+
case object InvalidReceiptsType extends BlacklistReasonType {
69+
val code: Int = 7
70+
val name: String = "InvalidReceipts"
71+
}
72+
case object RequestFailedType extends BlacklistReasonType {
73+
val code: Int = 8
74+
val name: String = "RequestFailed"
75+
}
76+
case object PeerActorTerminatedType extends BlacklistReasonType {
77+
val code: Int = 9
78+
val name: String = "PeerActorTerminated"
79+
}
80+
}
81+
82+
case object WrongBlockHeaders extends BlacklistReason {
83+
val reasonType: BlacklistReasonType = WrongBlockHeadersType
84+
val description: String = "Wrong blockheaders response (empty or not chain forming)"
85+
}
86+
case object BlockHeaderValidationFailed extends BlacklistReason {
87+
val reasonType: BlacklistReasonType = BlockHeaderValidationFailedType
88+
val description: String = "Block header validation failed"
89+
}
90+
case object ErrorInBlockHeaders extends BlacklistReason {
91+
val reasonType: BlacklistReasonType = ErrorInBlockHeadersType
92+
val description: String = "Error in block headers response"
93+
}
94+
final case class EmptyBlockBodies(knownHashes: Seq[String]) extends BlacklistReason {
95+
val reasonType: BlacklistReasonType = EmptyBlockBodiesType
96+
val description: String = s"Got empty block bodies response for known hashes: $knownHashes"
97+
}
98+
case object BlockBodiesNotMatchingHeaders extends BlacklistReason {
99+
val reasonType: BlacklistReasonType = BlockBodiesNotMatchingHeadersType
100+
val description = "Block bodies not matching block headers"
101+
}
102+
final case class EmptyReceipts(knownHashes: Seq[String]) extends BlacklistReason {
103+
val reasonType: BlacklistReasonType = EmptyReceiptsType
104+
val description: String = s"Got empty receipts for known hashes: $knownHashes"
105+
}
106+
final case class InvalidReceipts(knownHashes: Seq[String], error: BlockError) extends BlacklistReason {
107+
val reasonType: BlacklistReasonType = InvalidReceiptsType
108+
val description: String = s"Got invalid receipts for known hashes: $knownHashes due to: $error"
109+
}
110+
final case class RequestFailed(error: String) extends BlacklistReason {
111+
val reasonType: BlacklistReasonType = RequestFailedType
112+
val description: String = s"Request failed with error: $error"
113+
}
114+
case object PeerActorTerminated extends BlacklistReason {
115+
val reasonType: BlacklistReasonType = PeerActorTerminatedType
116+
val description: String = "Peer actor terminated"
117+
}
118+
}
26119
}
27120

28-
final case class CacheBasedBlacklist(cache: Cache[BlacklistId, String]) extends Blacklist with Logger {
121+
final case class CacheBasedBlacklist(cache: Cache[BlacklistId, BlacklistReasonType]) extends Blacklist with Logger {
122+
123+
import CacheBasedBlacklist._
29124

30125
override def isBlacklisted(id: BlacklistId): Boolean = cache.getIfPresent(id).isDefined
31126

32-
override def add(id: BlacklistId, duration: FiniteDuration, reason: String): Unit =
33-
cache
34-
.policy()
35-
.expireVariably()
36-
.toScala
37-
.fold {
38-
log.warn(
39-
s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration."
40-
)
41-
cache.put(id, reason)
42-
} { varExpirationPolicy =>
43-
varExpirationPolicy.put(id, reason, duration.toJava)
44-
}
127+
override def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit = {
128+
log.debug("Blacklisting peer [{}] for {}. Reason: {}", id, duration, reason)
129+
cache.policy().expireVariably().toScala match {
130+
case Some(varExpiration) => varExpiration.put(id, reason.reasonType, duration.toJava)
131+
case None =>
132+
log.warn(customExpirationError(id))
133+
cache.put(id, reason.reasonType)
134+
}
135+
}
45136
override def remove(id: BlacklistId): Unit = cache.invalidate(id)
46137

47138
override def keys: Set[BlacklistId] = cache.underlying.asMap().keySet().asScala.toSet
48139
}
49140

50141
object CacheBasedBlacklist {
51142

143+
def customExpirationError(id: BlacklistId): String =
144+
s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration."
145+
52146
def empty(maxSize: Int): CacheBasedBlacklist = {
53147
val cache =
54148
Scaffeine()
55-
.expireAfter[BlacklistId, String](
149+
.expireAfter[BlacklistId, BlacklistReasonType](
56150
create = (_, _) => 60.minutes,
57151
update = (_, _, _) => 60.minutes,
58152
read = (_, _, _) => 60.minutes
59153
) // required to enable VarExpiration policy (i.e. set custom expiration time per element)
60154
.maximumSize(
61155
maxSize
62156
) // uses Window TinyLfu eviction policy, see https://github.com/ben-manes/caffeine/wiki/Efficiency
63-
.build[BlacklistId, String]()
157+
.build[BlacklistId, BlacklistReasonType]()
64158
CacheBasedBlacklist(cache)
65159
}
66160

src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import scala.concurrent.duration._
1313

1414
trait PeerListSupportNg { self: Actor with ActorLogging =>
1515
import PeerListSupportNg._
16+
import Blacklist._
1617

1718
private implicit val ec: ExecutionContext = context.dispatcher
1819

@@ -33,17 +34,17 @@ trait PeerListSupportNg { self: Actor with ActorLogging =>
3334

3435
def handlePeerListMessages: Receive = {
3536
case EtcPeerManagerActor.HandshakedPeers(peers) => updatePeers(peers)
36-
case PeerDisconnected(peerId) => removePeer(peerId)
37+
case PeerDisconnected(peerId) => removePeerById(peerId)
3738
}
3839

3940
def peersToDownloadFrom: Map[PeerId, PeerWithInfo] =
4041
handshakedPeers.filterNot { case (peerId, _) =>
4142
blacklist.isBlacklisted(peerId)
4243
}
4344

44-
def peerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer)
45+
def getPeerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer)
4546

46-
def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: String): Unit =
47+
def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: BlacklistReason): Unit =
4748
handshakedPeers.get(peerId).foreach(_ => blacklist.add(peerId, duration, reason))
4849

4950
private def updatePeers(peers: Map[Peer, PeerInfo]): Unit = {
@@ -56,7 +57,7 @@ trait PeerListSupportNg { self: Actor with ActorLogging =>
5657
handshakedPeers = updated
5758
}
5859

59-
private def removePeer(peerId: PeerId): Unit = {
60+
private def removePeerById(peerId: PeerId): Unit = {
6061
if (handshakedPeers.keySet.contains(peerId)) {
6162
peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId)))
6263
blacklist.remove(peerId)

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo
88
import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived
99
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
1010
import io.iohk.ethereum.blockchain.sync._
11+
import io.iohk.ethereum.blockchain.sync.Blacklist._
12+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason._
1113
import io.iohk.ethereum.blockchain.sync.fast.ReceiptsValidator.ReceiptsValidationResult
1214
import io.iohk.ethereum.blockchain.sync.fast.SyncBlocksValidator.BlockBodyValidationResult
1315
import io.iohk.ethereum.blockchain.sync.fast.SyncStateSchedulerActor.{
@@ -183,7 +185,7 @@ class FastSync(
183185
)
184186
handleBlockHeaders(peer, blockHeaders)
185187
else
186-
blacklist.add(peer.id, blacklistDuration, "wrong blockheaders response (empty or not chain forming)")
188+
blacklist.add(peer.id, blacklistDuration, WrongBlockHeaders)
187189
}
188190

189191
case ResponseReceived(peer, BlockBodies(blockBodies), timeTaken) =>
@@ -205,10 +207,10 @@ class FastSync(
205207
handleReceipts(peer, requestedHashes, receipts)
206208

207209
case PeerRequestHandler.RequestFailed(peer, reason) =>
208-
handleRequestFailure(peer, sender(), reason)
210+
handleRequestFailure(peer, sender(), RequestFailed(reason))
209211

210212
case Terminated(ref) if assignedHandlers.contains(ref) =>
211-
handleRequestFailure(assignedHandlers(ref), ref, "Unexpected error")
213+
handleRequestFailure(assignedHandlers(ref), ref, PeerActorTerminated)
212214
}
213215

214216
def askForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Unit = {
@@ -423,7 +425,7 @@ class FastSync(
423425
}
424426

425427
private def handleRewind(header: BlockHeader, peer: Peer, N: Int, duration: FiniteDuration): Unit = {
426-
blacklist.add(peer.id, duration, "block header validation failed")
428+
blacklist.add(peer.id, duration, BlockHeaderValidationFailed)
427429
if (header.number <= syncState.safeDownloadTarget) {
428430
discardLastBlocks(header.number, N)
429431
syncState = syncState.updateDiscardedBlocks(header, N)
@@ -461,27 +463,22 @@ class FastSync(
461463
)
462464
}
463465
} else {
464-
blacklist.add(peer.id, blacklistDuration, "error in block headers response")
466+
blacklist.add(peer.id, blacklistDuration, ErrorInBlockHeaders)
465467
processSyncing()
466468
}
467469
}
468470

469471
private def handleBlockBodies(peer: Peer, requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]): Unit = {
470472
if (blockBodies.isEmpty) {
471-
val reason =
472-
s"got empty block bodies response for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}"
473-
blacklist.add(peer.id, blacklistDuration, reason)
473+
val knownHashes = requestedHashes.map(ByteStringUtils.hash2string)
474+
blacklist.add(peer.id, blacklistDuration, EmptyBlockBodies(knownHashes))
474475
syncState = syncState.enqueueBlockBodies(requestedHashes)
475476
} else {
476477
validateBlocks(requestedHashes, blockBodies) match {
477478
case BlockBodyValidationResult.Valid =>
478479
insertBlocks(requestedHashes, blockBodies)
479480
case BlockBodyValidationResult.Invalid =>
480-
blacklist.add(
481-
peer.id,
482-
blacklistDuration,
483-
s"responded with block bodies not matching block headers, blacklisting for $blacklistDuration"
484-
)
481+
blacklist.add(peer.id, blacklistDuration, BlockBodiesNotMatchingHeaders)
485482
syncState = syncState.enqueueBlockBodies(requestedHashes)
486483
case BlockBodyValidationResult.DbError =>
487484
redownloadBlockchain()
@@ -493,8 +490,8 @@ class FastSync(
493490

494491
private def handleReceipts(peer: Peer, requestedHashes: Seq[ByteString], receipts: Seq[Seq[Receipt]]): Unit = {
495492
if (receipts.isEmpty) {
496-
val reason = s"got empty receipts for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}"
497-
blacklist.add(peer.id, blacklistDuration, reason)
493+
val knownHashes = requestedHashes.map(ByteStringUtils.hash2string)
494+
blacklist.add(peer.id, blacklistDuration, EmptyReceipts(knownHashes))
498495
syncState = syncState.enqueueReceipts(requestedHashes)
499496
} else {
500497
validateReceipts(requestedHashes, receipts) match {
@@ -515,10 +512,8 @@ class FastSync(
515512
}
516513

517514
case ReceiptsValidationResult.Invalid(error) =>
518-
val reason =
519-
s"got invalid receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" +
520-
s" due to: $error"
521-
blacklist.add(peer.id, blacklistDuration, reason)
515+
val knownHashes = requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))
516+
blacklist.add(peer.id, blacklistDuration, InvalidReceipts(knownHashes, error))
522517
syncState = syncState.enqueueReceipts(requestedHashes)
523518

524519
case ReceiptsValidationResult.DbError =>
@@ -529,7 +524,7 @@ class FastSync(
529524
processSyncing()
530525
}
531526

532-
private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: String): Unit = {
527+
private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: BlacklistReason): Unit = {
533528
removeRequestHandler(handler)
534529

535530
syncState = syncState

0 commit comments

Comments
 (0)