diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index 6a39c0c2..e4bc969c 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -115,12 +115,12 @@ public void bootstrap(Host us) { // lookup our own peer id to keep our nearest neighbours up-to-date, // and connect to all of them, so they know about our addresses List closestToUs = findClosestPeers(Multihash.deserialize(us.getPeerId().getBytes()), 20, us); - int connectedClosest = 0; + List connected = new ArrayList<>(); for (PeerAddresses peer : closestToUs) { if (connectTo(us, peer)) - connectedClosest++; + connected.add(peer.peerId); } - LOG.info("Bootstrap connected to " + connectedClosest + " nodes close to us."); + LOG.info("Bootstrap connected to " + connected.size() + " nodes close to us. " + connected.stream().map(Multihash::toString).sorted().limit(5).collect(Collectors.toList())); } static class RoutingEntry { @@ -233,12 +233,16 @@ public CompletableFuture> findProviders(Multihash block, Hos List> futures = queryThisRound.stream() .parallel() .map(r -> { - KademliaController res = null; + StreamPromise conn = null; try { - res = dialPeer(r.addresses, us).join(); - return res.getProviders(block).orTimeout(2, TimeUnit.SECONDS); - }catch (Exception e) { + conn = dialPeer(r.addresses, us); + return conn.getController().join() + .getProviders(block).orTimeout(2, TimeUnit.SECONDS); + } catch (Exception e) { return null; + } finally { + if (conn != null) + conn.getStream().thenApply(s -> s.close()); } }).filter(prov -> prov != null) .collect(Collectors.toList()); @@ -269,8 +273,11 @@ public CompletableFuture> findProviders(Multihash block, Hos } private CompletableFuture> getCloserPeers(byte[] key, PeerAddresses target, Host us) { + StreamPromise conn = null; try { - return dialPeer(target, us).orTimeout(2, TimeUnit.SECONDS).join().closerPeers(key); + conn = dialPeer(target, us); + KademliaController contr = conn.getController().orTimeout(2, TimeUnit.SECONDS).join(); + return contr.closerPeers(key); } catch (Exception e) { // we can't dial quic only nodes until it's implemented if (target.addresses.stream().allMatch(a -> a.toString().contains("quic"))) @@ -282,6 +289,9 @@ private CompletableFuture> getCloserPeers(byte[] key, PeerAd else if (e.getCause() instanceof ConnectionClosedException) {} else e.printStackTrace(); + } finally { + if (conn != null) + conn.getStream().thenApply(s -> s.close()); } return CompletableFuture.completedFuture(Collections.emptyList()); } @@ -292,26 +302,34 @@ private Multiaddr[] getPublic(PeerAddresses target) { .collect(Collectors.toList()).toArray(new Multiaddr[0]); } - private CompletableFuture dialPeer(PeerAddresses target, Host us) { + private StreamPromise dialPeer(PeerAddresses target, Host us) { Multiaddr[] multiaddrs = target.addresses.stream() .map(a -> Multiaddr.fromString(a.toString())) .filter(a -> ! a.has(Protocol.DNS) && ! a.has(Protocol.DNS4) && ! a.has(Protocol.DNS6)) .collect(Collectors.toList()).toArray(new Multiaddr[0]); - return dial(us, PeerId.fromBase58(target.peerId.toBase58()), multiaddrs).getController(); + return dial(us, PeerId.fromBase58(target.peerId.toBase58()), multiaddrs); } public CompletableFuture provideBlock(Multihash block, Host us, PeerAddresses ourAddrs) { List closestPeers = findClosestPeers(block, 20, us); List> provides = closestPeers.stream() .parallel() - .map(p -> dialPeer(p, us) - .thenCompose(contr -> contr.provide(block, ourAddrs)) - .exceptionally(t -> { - if (t.getCause() instanceof NonCompleteException) + .map(p -> { + StreamPromise conn = dialPeer(p, us); + return conn.getController() + .thenCompose(contr -> contr.provide(block, ourAddrs)) + .thenApply(res -> { + conn.getStream().thenApply(s -> s.close()); + return res; + }) + .exceptionally(t -> { + if (t.getCause() instanceof NonCompleteException) + return true; + LOG.log(Level.FINE, t, t::getMessage); + conn.getStream().thenApply(s -> s.close()); return true; - LOG.log(Level.FINE, t, t::getMessage); - return true; - })) + }); + }) .collect(Collectors.toList()); return CompletableFuture.allOf(provides.toArray(new CompletableFuture[0])); } @@ -333,10 +351,15 @@ private CompletableFuture putValue(Multihash publisher, byte[] signedRecord, PeerAddresses peer, Host us) { + StreamPromise conn = null; try { - return dialPeer(peer, us).join() + conn = dialPeer(peer, us); + return conn.getController().join() .putValue(publisher, signedRecord); - } catch (Exception e) {} + } catch (Exception e) {} finally { + if (conn != null) + conn.getStream().thenApply(s -> s.close()); + } return CompletableFuture.completedFuture(false); } @@ -446,8 +469,11 @@ public CompletableFuture resolveIpnsValue(Multihash publisher, Host us, } private CompletableFuture> getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) { + StreamPromise conn = null; try { - return dialPeer(peer, us) + conn = dialPeer(peer, us); + return conn + .getController() .orTimeout(1, TimeUnit.SECONDS) .join() .getValue(publisher) @@ -455,6 +481,9 @@ private CompletableFuture> getValueFromPeer(PeerAddresses pe .thenApply(Optional::of); } catch (Exception e) { return CompletableFuture.completedFuture(Optional.empty()); + } finally { + if (conn != null) + conn.getStream().thenApply(s -> s.close()); } } public List resolveValue(Multihash publisher, int minResults, Host us) {