@@ -9,8 +9,14 @@ import cats.data.NonEmptyList
9
9
import io .iohk .ethereum .network .p2p .messages .{Capability , WireProtocol }
10
10
import io .iohk .ethereum .network .p2p .messages .WireProtocol .Hello
11
11
import io .iohk .ethereum .network .p2p .messages .WireProtocol .Hello .HelloEnc
12
- import io .iohk .ethereum .network .p2p .{Message , MessageDecoder , MessageSerializable , NetworkMessageDecoder }
13
- import io .iohk .ethereum .network .rlpx .RLPxConnectionHandler .{HelloExtractor , RLPxConfiguration }
12
+ import io .iohk .ethereum .network .p2p .{
13
+ EthereumMessageDecoder ,
14
+ Message ,
15
+ MessageDecoder ,
16
+ MessageSerializable ,
17
+ NetworkMessageDecoder
18
+ }
19
+ import io .iohk .ethereum .network .rlpx .RLPxConnectionHandler .{HelloCodec , RLPxConfiguration }
14
20
import io .iohk .ethereum .utils .ByteUtils
15
21
import org .bouncycastle .util .encoders .Hex
16
22
@@ -30,12 +36,11 @@ import scala.util.{Failure, Success, Try}
30
36
* 4. once handshake is done (and secure connection established) actor can send/receive messages (`handshaked` state)
31
37
*/
32
38
class RLPxConnectionHandler (
33
- messageDecoder : MessageDecoder ,
34
39
capabilities : List [Capability ],
35
40
authHandshaker : AuthHandshaker ,
36
- messageCodecFactory : (FrameCodec , MessageDecoder , Capability , Long ) => MessageCodec ,
41
+ messageCodecFactory : (FrameCodec , Capability , Long ) => MessageCodec ,
37
42
rlpxConfiguration : RLPxConfiguration ,
38
- extractor : Secrets => HelloExtractor
43
+ extractor : Secrets => HelloCodec
39
44
) extends Actor
40
45
with ActorLogging {
41
46
@@ -172,7 +177,7 @@ class RLPxConnectionHandler(
172
177
}
173
178
174
179
def awaitInitialHello (
175
- extractor : HelloExtractor ,
180
+ extractor : HelloCodec ,
176
181
cancellableAckTimeout : Option [CancellableAckTimeout ] = None ,
177
182
seqNumber : Int = 0
178
183
): Receive =
@@ -183,7 +188,7 @@ class RLPxConnectionHandler(
183
188
) orElse handleReceiveHello(extractor, cancellableAckTimeout, seqNumber)
184
189
185
190
private def handleSendHello (
186
- extractor : HelloExtractor ,
191
+ extractor : HelloCodec ,
187
192
cancellableAckTimeout : Option [CancellableAckTimeout ] = None ,
188
193
seqNumber : Int = 0
189
194
): Receive = {
@@ -211,7 +216,7 @@ class RLPxConnectionHandler(
211
216
}
212
217
213
218
private def handleReceiveHello (
214
- extractor : HelloExtractor ,
219
+ extractor : HelloCodec ,
215
220
cancellableAckTimeout : Option [CancellableAckTimeout ] = None ,
216
221
seqNumber : Int = 0
217
222
): Receive = { case Received (data) =>
@@ -241,9 +246,9 @@ class RLPxConnectionHandler(
241
246
}
242
247
}
243
248
244
- private def negotiateCodec (hello : Hello , extractor : HelloExtractor ): Option [(MessageCodec , Capability )] =
249
+ private def negotiateCodec (hello : Hello , extractor : HelloCodec ): Option [(MessageCodec , Capability )] =
245
250
Capability .negotiate(hello.capabilities.toList, capabilities).map { negotiated =>
246
- (messageCodecFactory(extractor.frameCodec, messageDecoder, negotiated, hello.p2pVersion), negotiated)
251
+ (messageCodecFactory(extractor.frameCodec, negotiated, hello.p2pVersion), negotiated)
247
252
}
248
253
249
254
private def processFrames (frames : Seq [Frame ], messageCodec : MessageCodec ): Unit =
@@ -373,29 +378,28 @@ class RLPxConnectionHandler(
373
378
374
379
object RLPxConnectionHandler {
375
380
def props (
376
- messageDecoder : MessageDecoder ,
377
381
capabilities : List [Capability ],
378
382
authHandshaker : AuthHandshaker ,
379
383
rlpxConfiguration : RLPxConfiguration
380
384
): Props =
381
385
Props (
382
386
new RLPxConnectionHandler (
383
- messageDecoder,
384
387
capabilities,
385
388
authHandshaker,
386
- messageCodecFactory ,
389
+ ethMessageCodecFactory ,
387
390
rlpxConfiguration,
388
- HelloExtractor .apply
391
+ HelloCodec .apply
389
392
)
390
393
)
391
394
392
- def messageCodecFactory (
395
+ def ethMessageCodecFactory (
393
396
frameCodec : FrameCodec ,
394
- messageDecoder : MessageDecoder ,
395
- protocolVersion : Capability ,
397
+ negotiated : Capability ,
396
398
p2pVersion : Long
397
- ): MessageCodec =
398
- new MessageCodec (frameCodec, messageDecoder, protocolVersion, p2pVersion)
399
+ ): MessageCodec = {
400
+ val md = EthereumMessageDecoder .ethMessageDecoder(negotiated) orElse NetworkMessageDecoder
401
+ new MessageCodec (frameCodec, md, p2pVersion)
402
+ }
399
403
400
404
case class ConnectTo (uri : URI )
401
405
@@ -424,7 +428,7 @@ object RLPxConnectionHandler {
424
428
val waitForTcpAckTimeout : FiniteDuration
425
429
}
426
430
427
- case class HelloExtractor (secrets : Secrets ) {
431
+ case class HelloCodec (secrets : Secrets ) {
428
432
import MessageCodec ._
429
433
lazy val frameCodec = new FrameCodec (secrets)
430
434
@@ -448,7 +452,7 @@ object RLPxConnectionHandler {
448
452
private def extractHello (frame : Frame ): Option [Hello ] = {
449
453
val frameData = frame.payload.toArray
450
454
if (frame.`type` == Hello .code) {
451
- val m = NetworkMessageDecoder .fromBytes(frame.`type`, frameData, Capability . Capabilities . Eth63Capability )
455
+ val m = NetworkMessageDecoder .fromBytes(frame.`type`, frameData)
452
456
Some (m.asInstanceOf [Hello ])
453
457
} else {
454
458
None
0 commit comments