Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

WIP: Use discv5 topic-search to find compatible peers #209

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 144 additions & 36 deletions evm/p2p/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
More information at https://github.com/ethereum/devp2p/blob/master/rlpx.md#node-discovery
"""
import asyncio
import copy
import logging
import os
import sha3
import time
from typing import (
Any,
Expand Down Expand Up @@ -63,11 +66,16 @@ def __repr__(self):
return 'Command(%s:%d)' % (self.name, self.id)


CMD_PING = Command("ping", 1, 4)
CMD_PONG = Command("pong", 2, 3)
CMD_PING = Command("ping", 1, 5)
CMD_PONG = Command("pong", 2, 6)
CMD_FIND_NODE = Command("find_node", 3, 2)
CMD_NEIGHBOURS = Command("neighbours", 4, 2)
CMD_ID_MAP = dict((cmd.id, cmd) for cmd in [CMD_PING, CMD_PONG, CMD_FIND_NODE, CMD_NEIGHBOURS])
CMD_TOPIC_QUERY = Command("topic_query", 7, 2)
CMD_TOPIC_NODES = Command("topic_nodes", 8, 2)
CMD_ID_MAP = dict(
(cmd.id, cmd)
for cmd in [
CMD_PING, CMD_PONG, CMD_FIND_NODE, CMD_NEIGHBOURS, CMD_TOPIC_QUERY, CMD_TOPIC_NODES])


class DiscoveryProtocol(asyncio.DatagramProtocol):
Expand All @@ -77,12 +85,17 @@ class DiscoveryProtocol(asyncio.DatagramProtocol):
_max_neighbours_per_packet_cache = None

def __init__(self, privkey: datatypes.PrivateKey, address: kademlia.Address,
bootstrap_nodes: List[kademlia.Node]) -> None:
bootstrap_nodes: List[kademlia.Node],
topic: bytes) -> None:
self.privkey = privkey
self.address = address
self.bootstrap_nodes = bootstrap_nodes
self.topic = topic
self.this_node = kademlia.Node(self.pubkey, address)
self.kademlia = kademlia.KademliaProtocol(self.this_node, wire=self)
# XXX: Dirty hack to keep track of nodes that have a matching topic with us
self.matching_nodes = set()
self.queried_nodes = {}

@property
def pubkey(self) -> datatypes.PublicKey:
Expand All @@ -97,8 +110,13 @@ def _get_handler(self, cmd) -> Callable[[kademlia.Node, List[Any], AnyStr], None
return self.recv_find_node
elif cmd == CMD_NEIGHBOURS:
return self.recv_neighbours
elif cmd == CMD_TOPIC_NODES:
return self.recv_topic_nodes
else:
raise ValueError("Unknwon command: {}".format(cmd))
# XXX: Temporary, while we don't support all discv5 commands
# raise ValueError("Unknown command: %s", cmd)
self.logger.debug('no handler for discv5 command %s', cmd)
return lambda *args: None

def _get_max_neighbours_per_packet(self):
if self._max_neighbours_per_packet_cache is not None:
Expand All @@ -114,6 +132,38 @@ async def listen(self, loop: asyncio.AbstractEventLoop) -> Tuple[
def connection_made(self, transport):
self.transport = transport

async def run(self):
"""Loop forever, trying to find new peers that support our topic.

This is a bit of a hack, just to experiment with discv5's topic-search functionality.
"""
await self.listen(asyncio.get_event_loop())
bonded = await asyncio.gather(*[self.kademlia.bond(n) for n in self.bootstrap_nodes])
if not any(bonded):
self.logger.warn("Failed to bond with bootstrap nodes {}".format(self.bootstrap_nodes))
# TODO: Must raise or call sys.exit
return

topic_hash = sha3.keccak_256(self.topic).digest()
while True:
target = big_endian_to_int(topic_hash[:8] + os.urandom(24))
try:
nodes = await self.kademlia.lookup(target)
except kademlia.AlreadyWaiting as e:
self.logger.warn("Error when looking up: %s", e)
continue
for node in nodes:
now = time.time()
last_query = self.queried_nodes.get(node)
if last_query is not None and last_query > (now - (60 * 60)):
continue
self.queried_nodes[node] = time.time()
self.send_topic_query(node)
self.logger.info(
"Nodes with matching topics: %s",
[(n.pubkey.to_bytes(), n.address) for n in self.matching_nodes])
await asyncio.sleep(5)

async def bootstrap(self):
while self.transport is None:
# FIXME: Instead of sleeping here to wait until connection_made() is called to set
Expand All @@ -139,51 +189,91 @@ def stop(self):
self.transport.close()

def receive(self, address: kademlia.Address, message: AnyStr) -> None:
# XXX: Quick hack while we don't support all of discv5 commands
cmd_id = safe_ord(message[HEAD_SIZE])
if cmd_id not in CMD_ID_MAP:
self.logger.debug('ignoring msg with unsupported cmd id: %d', cmd_id)
return

try:
remote_pubkey, cmd_id, payload, message_hash = _unpack(message)
except DefectiveMessage as e:
self.logger.error('error unpacking message: {}'.format(e))
return

# XXX: Commented out as on v5 the expiration is not always the last element.
# As of discovery version 4, expiration is the last element for all packets, so
# we can validate that here, but if it changes we may have to do so on the
# handler methods.
expiration = rlp.sedes.big_endian_int.deserialize(payload[-1])
if time.time() > expiration:
self.logger.error('received message already expired')
return
# expiration = rlp.sedes.big_endian_int.deserialize(payload[-1])
# if time.time() > expiration:
# self.logger.error('received message already expired')
# return

cmd = CMD_ID_MAP[cmd_id]
if len(payload) != cmd.elem_count:
self.logger.error('invalid {} payload: {}'.format(cmd.name, payload))
self.logger.error('invalid %s payload: %s', cmd.name, payload)
return
node = kademlia.Node(remote_pubkey, address)
handler = self._get_handler(cmd)
handler(node, payload, message_hash)

def recv_pong(self, node: kademlia.Node, payload: List[Any], _: AnyStr) -> None:
# The pong payload should have 3 elements: to, token, expiration
_, token, _ = payload
# The pong payload should have 6 elements: to, token, expiration, topic_hash,
# ticket_serial and wait_periods
_, token, _, topic_hash, ticket_serial, wait_periods = payload
self.kademlia.recv_pong(node, token)

def recv_neighbours(self, node: kademlia.Node, payload: List[Any], _: AnyStr) -> None:
# The neighbours payload should have 2 elements: nodes, expiration
nodes, _ = payload
self.kademlia.recv_neighbours(node, _extract_nodes_from_payload(nodes))

def recv_ping(self, node: kademlia.Node, _, message_hash: AnyStr) -> None:
self.kademlia.recv_ping(node, message_hash)
def recv_ping(self, remote: kademlia.Node, payload: List[Any], message_hash: AnyStr) -> None:
_, _, _, _, topics = payload
self.kademlia.recv_ping(remote, message_hash, topics)
# self.send_topic_query(remote)

def recv_find_node(self, node: kademlia.Node, payload: List[Any], _: AnyStr) -> None:
# The find_node payload should have 2 elements: node_id, expiration
self.logger.debug('<<< find_node from {}'.format(node))
node_id, _ = payload
self.kademlia.recv_find_node(node, big_endian_to_int(node_id))

def recv_topic_nodes(self, remote: kademlia.Node, payload: List[Any], _: AnyStr) -> None:
echo, raw_nodes = payload
nodes = _extract_nodes_from_payload(raw_nodes)
self.logger.debug('<<< topic_nodes from %s: %s', remote, nodes)
for node in nodes:
# XXX: Discovery v5 (while in test mode) runs on UDPport+1, so need to adjust that
# here.
# https://github.com/ethereum/go-ethereum/blob/bf62acf0332c962916787a23c78a2513137625ea/p2p/discv5/ticket.go#L646
addr = copy.copy(node.address)
addr.udp_port -= 1
addr.tcp_port -= 1
self.matching_nodes.add(kademlia.Node(node.pubkey, addr))
self.send_ping(node)
self.send_topic_query(node)

def send_topic_query(self, remote: kademlia.Node) -> None:
self.logger.debug('>>> topic_query to %s', remote)
payload = [
self.topic,
int(time.time() + EXPIRATION),
]
message = _pack(CMD_TOPIC_QUERY.id, payload, self.privkey)
self.send(remote, message)

def send_ping(self, node: kademlia.Node) -> bytes:
self.logger.debug('>>> pinging {}'.format(node))
version = rlp.sedes.big_endian_int.serialize(PROTO_VERSION)
payload = [version, self.address.to_endpoint(), node.address.to_endpoint()]
payload = [
version,
self.address.to_endpoint(),
node.address.to_endpoint(),
int(time.time() + EXPIRATION),
[self.topic]
]
message = _pack(CMD_PING.id, payload, self.privkey)
self.send(node, message)
# Return the msg hash, which is used as a token to identify pongs.
Expand All @@ -193,12 +283,29 @@ def send_find_node(self, node: kademlia.Node, target_node_id: int) -> None:
target_node_id = int_to_big_endian(
target_node_id).rjust(kademlia.k_pubkey_size // 8, b'\0')
self.logger.debug('>>> find_node to {}'.format(node))
message = _pack(CMD_FIND_NODE.id, [target_node_id], self.privkey)
payload = [
target_node_id,
int(time.time() + EXPIRATION),
]
message = _pack(CMD_FIND_NODE.id, payload, self.privkey)
self.send(node, message)

def send_pong(self, node: kademlia.Node, token: AnyStr) -> None:
def send_pong(self, node: kademlia.Node, token: AnyStr, topics) -> None:
self.logger.debug('>>> ponging {}'.format(node))
payload = [node.address.to_endpoint(), token]
h = sha3.keccak_256()
h.update(rlp.encode(topics))
topic_hash = h.digest()
# XXX: No idea what would be "correct" values for those
ticket_serial = 0
wait_periods = [60]
payload = [
node.address.to_endpoint(),
token,
int(time.time() + EXPIRATION),
topic_hash,
ticket_serial,
wait_periods,
]
message = _pack(CMD_PONG.id, payload, self.privkey)
self.send(node, message)

Expand Down Expand Up @@ -250,8 +357,8 @@ def _pack(cmd_id: int, payload: List[Any], privkey: datatypes.PrivateKey) -> byt
how UDP packets are structured.
"""
cmd_id = force_bytes(chr(cmd_id))
expiration = rlp.sedes.big_endian_int.serialize(int(time.time() + EXPIRATION))
encoded_data = cmd_id + rlp.encode(payload + [expiration])
# expiration = rlp.sedes.big_endian_int.serialize(int(time.time() + EXPIRATION))
encoded_data = cmd_id + rlp.encode(payload)
signature = privkey.sign_msg(encoded_data)
message_hash = keccak(signature.to_bytes() + encoded_data)
return message_hash + signature.to_bytes() + encoded_data
Expand Down Expand Up @@ -291,39 +398,40 @@ async def show_tasks():
listen_host = '0.0.0.0'
listen_port = 30303
bootstrap_uris = [
# Discv5 topic discovery bootnode
b"enode://0cc5f5ffb5d9098c8b8c62325f3797f56509bff942704687b6530992ac706e2cb946b90a34f1f19548cd3c7baccbcaea354531e5983c7d1bc0dee16ce4b6440b@40.118.3.223:30305", # noqa: E501
# b"enode://1c7a64d76c0334b0418c004af2f67c50e36a3be60b5e4790bdac0439d21603469a85fad36f2473c9a80eb043ae60936df905fa28f1ff614c3e5dc34f15dcd2dc@40.118.3.223:30308", # noqa: E501
# b"enode://85c85d7143ae8bb96924f2b54f1b3e70d8c4d367af305325d30a61385a432f247d2c75c45c6b4a60335060d072d7f5b35dd1d4c45f76941f62a4f83b6e75daaf@40.118.3.223:30309", # noqa: E501
# Local geth bootnodes
# b'enode://3a514176466fa815ed481ffad09110a2d344f6c9b78c1d14afc351c3a51be33d8072e77939dc03ba44790779b7a1025baf3003f6732430e20cd9b76d953391b3@127.0.0.1:30301', # noqa: E501
# b'enode://3a514176466fa815ed481ffad09110a2d344f6c9b78c1d14afc351c3a51be33d8072e77939dc03ba44790779b7a1025baf3003f6732430e20cd9b76d953391b3@0.0.0.0:30304', # noqa: E501
# Testnet bootnodes
# b'enode://6ce05930c72abc632c58e2e4324f7c7ea478cec0ed4fa2528982cf34483094e9cbc9216e7aa349691242576d552a2a56aaeae426c5303ded677ce455ba1acd9d@13.84.180.240:30303', # noqa: E501
# b'enode://20c9ad97c081d63397d7b685a412227a40e23c8bdc6688c6f37e97cfbc22d2b4d1db1510d8f61e6a8866ad7f0e17c02b14182d37ea7c3c8b9c2683aeb6b733a1@52.169.14.227:30303', # noqa: E501
# b'enode://6ce05930c72abc632c58e2e4324f7c7ea478cec0ed4fa2528982cf34483094e9cbc9216e7aa349691242576d552a2a56aaeae426c5303ded677ce455ba1acd9d@13.84.180.240:30304', # noqa: E501
# b'enode://20c9ad97c081d63397d7b685a412227a40e23c8bdc6688c6f37e97cfbc22d2b4d1db1510d8f61e6a8866ad7f0e17c02b14182d37ea7c3c8b9c2683aeb6b733a1@52.169.14.227:30304', # noqa: E501
# Mainnet bootnodes
# b'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501
# b'enode://3f1d12044546b76342d59d4a05532c14b85aa669704bfe1f864fe079415aa2c02d743e03218e57a33fb94523adb54032871a6c51b2cc5514cb7c7e35b3ed0a99@13.93.211.84:30303', # noqa: E501
b'enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303', # noqa: E501
b'enode://158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6@13.75.154.138:30303', # noqa: E501
b'enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303', # noqa: E501
# b'enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303', # noqa: E501
# b'enode://158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6@13.75.154.138:30303', # noqa: E501
# b'enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303', # noqa: E501
]

# LES@ + <first 8 bytes of genesis hash>, hex encoded
topic = b'LES@41941023680923e0' # LES/ropsten
# topic = b"LES@d4e56740f876aef8" # LES/mainnet

logger = logging.getLogger("evm.p2p.discovery")
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

loop = asyncio.get_event_loop()
loop.set_debug(True)

privkey = keys.PrivateKey(decode_hex(privkey_hex))
addr = kademlia.Address(listen_host, listen_port, listen_port)
bootstrap_nodes = [kademlia.Node.from_uri(x) for x in bootstrap_uris]
discovery = DiscoveryProtocol(privkey, addr, bootstrap_nodes)
loop.run_until_complete(discovery.listen(loop))

# There's no need to wait for bootstrap because we run_forever().
asyncio.ensure_future(discovery.bootstrap())

# This helps when debugging asyncio issues.
# task_monitor = asyncio.ensure_future(show_tasks())
discovery = DiscoveryProtocol(privkey, addr, bootstrap_nodes, topic)

try:
loop.run_forever()
loop.run_until_complete(discovery.run())
except KeyboardInterrupt:
pass

Expand Down
10 changes: 6 additions & 4 deletions evm/p2p/kademlia.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def from_uri(cls, uri: bytes) -> 'Node':
return cls(pubkey, Address(ip.decode(), port))

def __repr__(self):
return '<Node(%s@%s)>' % (self.pubkey.to_hex()[:6], self.address.ip)
addr = self.address
return '<Node(%s@%s:%d:%d)>' % (
self.pubkey.to_hex()[:6], addr.ip, addr.udp_port, addr.tcp_port)

def distance_to(self, id):
return self.id ^ id
Expand Down Expand Up @@ -346,7 +348,7 @@ def recv_pong(self, remote: Node, token: AnyStr) -> None:
'unexpected pong from {} with pingid {}, probably came too late'.format(
remote, encode_hex(pingid)))

def recv_ping(self, remote: Node, hash_: AnyStr) -> None:
def recv_ping(self, remote: Node, hash_: AnyStr, topics: List[AnyStr]) -> None:
"""Process a received ping packet.

A ping packet may come any time, unrequested, or may be prompted by us bond()ing with a
Expand All @@ -355,7 +357,7 @@ def recv_ping(self, remote: Node, hash_: AnyStr) -> None:
"""
self.logger.debug('<<< ping from {}'.format(remote))
self.update_routing_table(remote)
self.wire.send_pong(remote, hash_)
self.wire.send_pong(remote, hash_, topics)
# Sometimes a ping will be sent to us as part of the bond()ing performed the first time we
# see a node, and it is in those cases that a callback will exist.
callback = self.ping_callbacks.get(remote)
Expand Down Expand Up @@ -545,7 +547,7 @@ def _exclude_if_asked(nodes):
closest = sort_by_distance(closest, node_id)[:k_bucket_size]
nodes_to_ask = _exclude_if_asked(closest)

self.logger.info("lookup finished for {}: {}".format(node_id, closest))
self.logger.debug("lookup finished for {}: {}".format(node_id, closest))
return closest

# TODO: Run this as a coroutine that loops forever and after each iteration sleeps until the
Expand Down
Loading