Skip to content

Commit

Permalink
Merge pull request exo-explore#528 from exo-explore/interfaceprio
Browse files Browse the repository at this point in the history
show interface type and name, fix propagation of topology
  • Loading branch information
AlexCheema authored Dec 5, 2024
2 parents ec333f1 + db7c388 commit 17411df
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 43 deletions.
2 changes: 1 addition & 1 deletion exo/networking/grpc/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async def GetInferenceResult(self, request, context):
async def CollectTopology(self, request, context):
max_depth = request.max_depth
visited = set(request.visited)
topology = await self.node.collect_topology(visited, max_depth)
topology = self.node.current_topology
nodes = {
node_id:
node_service_pb2.DeviceCapabilities(
Expand Down
6 changes: 3 additions & 3 deletions exo/networking/grpc/node_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ message PeerConnections {
}

message DeviceFlops {
float fp32 = 1;
float fp16 = 2;
float int8 = 3;
double fp32 = 1;
double fp16 = 2;
double int8 = 3;
}

message DeviceCapabilities {
Expand Down
2 changes: 1 addition & 1 deletion exo/networking/grpc/node_service_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions exo/networking/udp/udp_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ async def task_broadcast_presence(self):
# Explicitly broadcasting on all assigned ips since broadcasting on `0.0.0.0` on MacOS does not broadcast over
# the Thunderbolt bridge when other connection modalities exist such as WiFi or Ethernet
for addr, interface_name in get_all_ip_addresses_and_interfaces():
interface_priority, _ = get_interface_priority_and_type(interface_name)
interface_priority, interface_type = get_interface_priority_and_type(interface_name)
message = json.dumps({
"type": "discovery",
"node_id": self.node_id,
"grpc_port": self.node_port,
"device_capabilities": self.device_capabilities.to_dict(),
"priority": interface_priority, # TODO: Prioritise interfaces based on bandwidth, latency, and jitter e.g. prioritise Thunderbolt over WiFi.
"interface_name": interface_name,
"interface_type": interface_type,
})
if DEBUG_DISCOVERY >= 3: print(f"Broadcasting presence at ({addr} - {interface_name} - {interface_priority}): {message}")

Expand Down Expand Up @@ -145,6 +146,7 @@ async def on_listen_message(self, data, addr):
peer_port = message["grpc_port"]
peer_prio = message["priority"]
peer_interface_name = message["interface_name"]
peer_interface_type = message["interface_type"]
device_capabilities = DeviceCapabilities(**message["device_capabilities"])

if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
Expand All @@ -154,7 +156,7 @@ async def on_listen_message(self, data, addr):
if DEBUG >= 1:
print(f"Ignoring peer {peer_id} at {peer_host}:{peer_port} with priority {peer_prio} because we already know about a peer with higher or equal priority: {existing_peer_prio}")
return
new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", peer_interface_name, device_capabilities)
new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", f"{peer_interface_type} ({peer_interface_name})", device_capabilities)
if not await new_peer_handle.health_check():
if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Skipping.")
return
Expand Down
16 changes: 8 additions & 8 deletions exo/orchestration/standard_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def start(self, wait_for_peers: int = 0) -> None:
await self.server.start()
await self.discovery.start()
await self.update_peers(wait_for_peers)
await self.collect_topology()
await self.collect_topology(set())
if DEBUG >= 2: print(f"Collected topology: {self.topology}")
asyncio.create_task(self.periodic_topology_collection(1.0))

Expand All @@ -83,7 +83,7 @@ def on_node_status(self, request_id, opaque_status):
download_progress = RepoProgressEvent.from_dict(status_data.get('progress'))
self.node_download_progress[status_data.get('node_id')] = download_progress
if self.topology_viz:
self.topology_viz.update_visualization(self.current_topology, self.partitioning_strategy.partition(self.current_topology), self.id, self.node_download_progress)
self.topology_viz.update_visualization(self.topology, self.partitioning_strategy.partition(self.topology), self.id, self.node_download_progress)
except Exception as e:
if DEBUG >= 1: print(f"Error updating visualization: {e}")
if DEBUG >= 1: traceback.print_exc()
Expand Down Expand Up @@ -374,8 +374,8 @@ async def periodic_topology_collection(self, interval: int):
try:
did_peers_change = await self.update_peers()
if DEBUG >= 2: print(f"{did_peers_change=}")
await self.collect_topology(set())
if did_peers_change:
await self.collect_topology()
await self.select_best_inference_engine()
except Exception as e:
print(f"Error collecting topology: {e}")
Expand All @@ -386,7 +386,7 @@ async def get_inference_result(self, request_id: str) -> Tuple[Optional[np.ndarr
return None, False
return np.array(self.buffered_token_output[request_id][0]), self.buffered_token_output[request_id][1]

async def collect_topology(self, visited: set[str] = set(), max_depth: int = 4) -> Topology:
async def collect_topology(self, visited: set[str], max_depth: int = 4) -> Topology:
next_topology = Topology()
next_topology.update_node(self.id, self.device_capabilities)

Expand All @@ -410,16 +410,16 @@ async def collect_topology(self, visited: set[str] = set(), max_depth: int = 4)
try:
other_topology = await asyncio.wait_for(peer.collect_topology(visited, max_depth=max_depth - 1), timeout=5.0)
if DEBUG >= 2: print(f"Collected topology from: {peer.id()}: {other_topology}")
self.topology.merge(other_topology)
next_topology.merge(other_topology)
except Exception as e:
print(f"Error collecting topology from {peer.id()}: {e}")
traceback.print_exc()

next_topology.active_node_id = self.topology.active_node_id # this is not so clean.
next_topology.active_node_id = self.topology.active_node_id
self.topology = next_topology
if self.topology_viz:
self.topology_viz.update_visualization(self.current_topology, self.partitioning_strategy.partition(self.current_topology), self.id)
return next_topology
self.topology_viz.update_visualization(self.topology, self.partitioning_strategy.partition(self.topology), self.id)
return self.topology

@property
def on_token(self) -> AsyncCallbackSystem[str, Tuple[str, List[int], bool]]:
Expand Down
33 changes: 6 additions & 27 deletions exo/topology/topology.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .device_capabilities import DeviceCapabilities
from typing import Dict, Set, Optional, NamedTuple
from typing import Dict, Set, Optional
from dataclasses import dataclass

@dataclass
Expand All @@ -21,7 +21,6 @@ def __eq__(self, other):
class Topology:
def __init__(self):
self.nodes: Dict[str, DeviceCapabilities] = {}
# Store PeerConnection objects in the adjacency lists
self.peer_graph: Dict[str, Set[PeerConnection]] = {}
self.active_node_id: Optional[str] = None

Expand All @@ -34,31 +33,11 @@ def get_node(self, node_id: str) -> DeviceCapabilities:
def all_nodes(self):
return self.nodes.items()

def add_edge(self, node1_id: str, node2_id: str, description: Optional[str] = None):
if node1_id not in self.peer_graph:
self.peer_graph[node1_id] = set()
if node2_id not in self.peer_graph:
self.peer_graph[node2_id] = set()

# Create bidirectional connections with the same description
conn1 = PeerConnection(node1_id, node2_id, description)
conn2 = PeerConnection(node2_id, node1_id, description)

self.peer_graph[node1_id].add(conn1)
self.peer_graph[node2_id].add(conn2)

def get_neighbors(self, node_id: str) -> Set[str]:
# Convert PeerConnection objects back to just destination IDs
return {conn.to_id for conn in self.peer_graph.get(node_id, set())}

def all_edges(self):
edges = []
for node_id, connections in self.peer_graph.items():
for conn in connections:
# Only include each edge once by checking if reverse already exists
if not any(e[0] == conn.to_id and e[1] == conn.from_id for e in edges):
edges.append((conn.from_id, conn.to_id, conn.description))
return edges
def add_edge(self, from_id: str, to_id: str, description: Optional[str] = None):
if from_id not in self.peer_graph:
self.peer_graph[from_id] = set()
conn = PeerConnection(from_id, to_id, description)
self.peer_graph[from_id].add(conn)

def merge(self, other: "Topology"):
for node_id, capabilities in other.nodes.items():
Expand Down
2 changes: 1 addition & 1 deletion exo/viz/topology_viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def _generate_main_layout(self) -> str:
conn2 = self.topology.peer_graph.get(self.partitions[next_i].node_id, set())
description1 = next((c.description for c in conn1 if c.to_id == self.partitions[next_i].node_id), "")
description2 = next((c.description for c in conn2 if c.to_id == partition.node_id), "")
connection_description = f"{description1}/{description2}" if description1 != description2 else description1
connection_description = f"{description1}/{description2}"

# Simple line drawing
steps = max(abs(next_x - x), abs(next_y - y))
Expand Down

0 comments on commit 17411df

Please # to comment.