Skip to content

Conversation

noot
Copy link
Contributor

@noot noot commented Jul 16, 2025

changes:

  • add bootnode service; this is a very basic p2p node which doesn't do anything but helps other nodes join the p2p network. alternative is to have a worker/validator/orchestrator be a bootnode but that would be unideal as bootnodes should be specialized (high uptime and can handle many incoming connections)
  • add kademlia to p2p service and expose KademliaAction handler to allow services to make dht queries
  • update worker to advertise itself as a worker in the DHT upon startup
  • update validator to fetch workers from the DHT during validation loop
  • update orchestrator DiscoveryMonitor to fetch workers from the DHT for updating its node store

testing:

  • local setup done with make up, make watch-worker and ensured the worker is discovered by the validator and orchestrator and invited to the compute pool.

@noot noot requested a review from Copilot July 16, 2025 20:54
Copilot

This comment was marked as outdated.

@noot noot requested a review from Copilot July 17, 2025 02:48
Copilot

This comment was marked as outdated.

@noot noot requested a review from Copilot July 17, 2025 16:39
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR replaces the centralized discovery service with a peer-to-peer Distributed Hash Table (DHT) discovery mechanism. This architectural change improves decentralization by removing the single point of failure and dependency on centralized discovery services.

Key changes include:

  • Introduction of bootnode service for network bootstrapping
  • Integration of Kademlia DHT for worker discovery and advertising
  • Replacement of HTTP-based discovery with P2P DHT queries

Reviewed Changes

Copilot reviewed 111 out of 114 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
docs/development-setup.md Updates documentation to reflect bootnode service replacing discovery service
docker-compose.yml Replaces discovery service with bootnode service configuration
deployment/k8s/ Complete migration from discovery-chart to bootnode-chart with template updates
crates/worker/src/ Removes discovery service code and integrates DHT advertising for workers
crates/validator/src/ Refactors to use DHT-based node discovery instead of HTTP discovery endpoints
crates/orchestrator/src/ Updates to fetch worker nodes from DHT for node management
crates/shared/src/ Adds DHT discovery utilities and updates node models
crates/p2p/src/ Implements Kademlia DHT functionality with discovery capabilities
Comments suppressed due to low confidence (1)

crates/p2p/src/lib.rs:448

  • The test is sleeping for 2 seconds instead of properly waiting for peer connection, which could lead to flaky tests. Consider implementing a proper mechanism to verify peer connectivity before proceeding with the test.
        // TODO: implement a way to get peer count (https://github.com/PrimeIntellect-ai/protocol/issues/628)

provider_address: String::new(),
compute_pool_id: 0,
worker_p2p_id: None,
worker_p2p_id: "empty".to_string(), // TODO: this should be a different type, as peer id is not needed for this code path
Copy link

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a placeholder value like "empty" for worker_p2p_id indicates a design issue. Consider using an Option or creating a separate type for cases where peer ID is not needed.

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, there is a todo

- REDIS_URL=redis://redis:6379
- PLATFORM_API_KEY=prime
- MAX_NODES_PER_IP=3
- LIBP2P_PRIVATE_KEY="d0884c9823a0a2c846dbf5e71853bc5f80b2ec5d2de46532cdbe8ab46f020836"
Copy link

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded private key in docker-compose.yml poses a security risk. Use environment variables or secure key management instead.

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed but this is a known key (in .env.example)

env:
RPC_URL: "http://anvil.example-namespace:8545"
DISCOVERY_URLS: "http://discovery.example-namespace:8089"
BOOTNODES: "/ip4/127.0.0.1/tcp/4005/p2p/12D3KooWJj3haDEzxGSbGSAvXCiE9pDYC9xHDdtQe8B2donhfwXL"
Copy link

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using localhost (127.0.0.1) as a bootnode address in Kubernetes deployment configuration will not work in a distributed environment. This should be updated to use proper cluster addresses.

Suggested change
BOOTNODES: "/ip4/127.0.0.1/tcp/4005/p2p/12D3KooWJj3haDEzxGSbGSAvXCiE9pDYC9xHDdtQe8B2donhfwXL"
BOOTNODES: "/dns4/bootnode-service/tcp/4005/p2p/12D3KooWJj3haDEzxGSbGSAvXCiE9pDYC9xHDdtQe8B2donhfwXL"

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, i have not tested this with kubernetes

}

#[tokio::main]
async fn main() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bootnode binary is new, but it's very simple, it only runs a p2p node.

@@ -1,59 +1,90 @@
use crate::discovery::location_service::LocationService;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the DiscoveryMonitor was refactored to have 2 internal components: NodeFetcher and Updater. NodeFetcher fetches the nodes from the DHT and returns them, while Updater performs the same "syncing" logic and updating the node store as previously.

// Check if there's any healthy node with the same IP and port
let count_healthy_nodes_with_same_endpoint = self
.count_healthy_nodes_with_same_endpoint(
// Check if there's any healthy node with the same peer ID
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was changed from ip:port collisions to p2p peer ID collisions under the assumption that distinct nodes will have distinct peer IDs, if two nodes have the same peer ID they're actually the same node.

.collect())
use crate::store::NodeStore;

async fn enrich_nodes_without_location(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was moved from the discovery service

pub port: u16,
pub compute_pool_id: u32,
pub compute_specs: Option<ComputeSpecs>,
pub worker_p2p_id: String, // TODO: change to p2p::PeerId
Copy link
Contributor Author

@noot noot Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was changed to not be Option, i would also like to change worker_p2p_addresses to not be Option also and change the types from String but diff was already huge

/// Given a kademlia action channel backed by a `p2p::Service`,
/// perform a DHT lookup for all nodes which claim to be a worker using `GetProviders` for `p2p::WORKER_DHT_KEY`.
/// then, for each of these nodes, query the DHT for their record (which stores their node information) using `GetRecord`.
pub async fn get_worker_nodes_from_dht(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is new logic which fetches the workers from the DHT

// Track the last loop duration in milliseconds
static LAST_LOOP_DURATION_MS: AtomicI64 = AtomicI64::new(0);

async fn get_rejections(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't delete this, just moved to end of file

}
});

loop {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i moved this loop's logic to the validator module

}

#[allow(clippy::too_many_arguments)]
async fn perform_validation(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same logic as validation loop which was in main before

// TODO: should update p2p service to expose this better (https://github.com/PrimeIntellect-ai/protocol/issues/628)
tokio::time::sleep(Duration::from_secs(1)).await;

let record_key = p2p::worker_dht_key_with_peer_id(&peer_id);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of uploading to the discovery service we now advertise on the DHT

@noot noot marked this pull request as ready for review July 17, 2025 16:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant