Skip to content

address: Add bucketed address store to improve CPU usage #410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl KademliaMessage {
4 => {
let peers = message
.closer_peers
.iter()
.into_iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.take(replication_factor)
.collect();
Expand Down Expand Up @@ -285,7 +285,7 @@ impl KademliaMessage {
record,
peers: message
.closer_peers
.iter()
.into_iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.take(replication_factor)
.collect(),
Expand All @@ -296,7 +296,7 @@ impl KademliaMessage {
let key = (!message.key.is_empty()).then_some(message.key.into())?;
let providers = message
.provider_peers
.iter()
.into_iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.take(replication_factor)
.collect();
Expand All @@ -308,13 +308,13 @@ impl KademliaMessage {
let key = (!message.key.is_empty()).then_some(message.key.into());
let peers = message
.closer_peers
.iter()
.into_iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.take(replication_factor)
.collect();
let providers = message
.provider_peers
.iter()
.into_iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.take(replication_factor)
.collect();
Expand Down
17 changes: 10 additions & 7 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl Kademlia {
.await;

for info in peers {
let addresses = info.addresses();
let addresses: Vec<Multiaddr> = info.addresses().cloned().collect();
self.service.add_known_address(&info.peer, addresses.clone().into_iter());

if std::matches!(self.update_mode, RoutingTableUpdateMode::Automatic) {
Expand Down Expand Up @@ -544,7 +544,7 @@ impl Kademlia {

match (providers.len(), providers.pop()) {
(1, Some(provider)) => {
let addresses = provider.addresses();
let addresses: Vec<Multiaddr> = provider.addresses().cloned().collect();

if provider.peer == peer {
self.store.put_provider(
Expand Down Expand Up @@ -797,7 +797,7 @@ impl Kademlia {
query_id: query,
peers: peers
.into_iter()
.map(|info| (info.peer, info.addresses()))
.map(|info| (info.peer, info.addresses().cloned().collect()))
.collect(),
})
.await;
Expand Down Expand Up @@ -1431,7 +1431,10 @@ mod tests {
// Check peer addresses.
match kademlia.routing_table.entry(Key::from(peer)) {
KBucketEntry::Occupied(entry) => {
assert_eq!(entry.addresses(), vec![address_a.clone()]);
assert_eq!(
entry.addresses().cloned().collect::<Vec<_>>(),
vec![address_a.clone()]
);
}
_ => panic!("Peer not found in routing table"),
};
Expand All @@ -1450,7 +1453,7 @@ mod tests {
match kademlia.routing_table.entry(Key::from(peer)) {
KBucketEntry::Occupied(entry) => {
assert_eq!(
entry.addresses(),
entry.addresses().cloned().collect::<Vec<_>>(),
vec![address_b.clone(), address_a.clone()]
);
}
Expand All @@ -1469,7 +1472,7 @@ mod tests {
match kademlia.routing_table.entry(Key::from(peer)) {
KBucketEntry::Occupied(entry) => {
assert_eq!(
entry.addresses(),
entry.addresses().cloned().collect::<Vec<_>>(),
vec![address_b.clone(), address_a.clone()]
);
}
Expand All @@ -1483,7 +1486,7 @@ mod tests {
match kademlia.routing_table.entry(Key::from(peer)) {
KBucketEntry::Occupied(entry) => {
assert_eq!(
entry.addresses(),
entry.addresses().cloned().collect::<Vec<_>>(),
vec![address_a.clone(), address_b.clone()]
);
}
Expand Down
5 changes: 4 additions & 1 deletion src/protocol/libp2p/kademlia/query/get_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ impl GetProvidersContext {
// Merge addresses of different provider records of the same peer.
let mut providers = HashMap::<PeerId, HashSet<Multiaddr>>::new();
found_providers.into_iter().for_each(|provider| {
providers.entry(provider.peer).or_default().extend(provider.addresses())
providers
.entry(provider.peer)
.or_default()
.extend(provider.addresses().cloned())
});

// Convert into `Vec<KademliaPeer>`
Expand Down
9 changes: 2 additions & 7 deletions src/protocol/libp2p/kademlia/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
protocol::libp2p::kademlia::types::{
ConnectionType, Distance, KademliaPeer, Key as KademliaKey,
},
transport::manager::address::{AddressRecord, AddressStore},
transport::manager::address::AddressStoreBuckets,
Multiaddr, PeerId,
};

Expand Down Expand Up @@ -170,15 +170,10 @@ pub struct ContentProvider {

impl From<ContentProvider> for KademliaPeer {
fn from(provider: ContentProvider) -> Self {
let mut address_store = AddressStore::new();
for address in provider.addresses.iter() {
address_store.insert(AddressRecord::from_raw_multiaddr(address.clone()));
}

Self {
key: KademliaKey::from(provider.peer),
peer: provider.peer,
address_store,
address_store: AddressStoreBuckets::from_unknown(provider.addresses),
connection: ConnectionType::NotConnected,
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/protocol/libp2p/kademlia/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ mod tests {
KBucketEntry::Occupied(entry) => {
assert_eq!(entry.key, key);
assert_eq!(entry.peer, peer);
assert_eq!(entry.addresses(), addresses);
assert_eq!(entry.addresses().cloned().collect::<Vec<_>>(), addresses);
assert_eq!(entry.connection, ConnectionType::Connected);
}
state => panic!("invalid state for `KBucketEntry`: {state:?}"),
Expand All @@ -418,7 +418,7 @@ mod tests {
KBucketEntry::Occupied(entry) => {
assert_eq!(entry.key, key);
assert_eq!(entry.peer, peer);
assert_eq!(entry.addresses(), addresses);
assert_eq!(entry.addresses().cloned().collect::<Vec<_>>(), addresses);
assert_eq!(entry.connection, ConnectionType::NotConnected);
}
state => panic!("invalid state for `KBucketEntry`: {state:?}"),
Expand Down Expand Up @@ -508,7 +508,7 @@ mod tests {
KBucketEntry::Occupied(entry) => {
assert_eq!(entry.key, key);
assert_eq!(entry.peer, peer);
assert_eq!(entry.addresses(), addresses);
assert_eq!(entry.addresses().cloned().collect::<Vec<_>>(), addresses);
assert_eq!(entry.connection, ConnectionType::CanConnect);
}
state => panic!("invalid state for `KBucketEntry`: {state:?}"),
Expand Down
30 changes: 8 additions & 22 deletions src/protocol/libp2p/kademlia/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use crate::{
protocol::libp2p::kademlia::schema,
transport::manager::address::{AddressRecord, AddressStore},
transport::manager::address::{AddressRecord, AddressStoreBuckets},
PeerId,
};

Expand Down Expand Up @@ -254,7 +254,7 @@ pub struct KademliaPeer {
pub(super) peer: PeerId,

/// Known addresses of peer.
pub(super) address_store: AddressStore,
pub(super) address_store: AddressStoreBuckets,

/// Connection type.
pub(super) connection: ConnectionType,
Expand All @@ -263,15 +263,9 @@ pub struct KademliaPeer {
impl KademliaPeer {
/// Create new [`KademliaPeer`].
pub fn new(peer: PeerId, addresses: Vec<Multiaddr>, connection: ConnectionType) -> Self {
let mut address_store = AddressStore::new();

for address in addresses.into_iter() {
address_store.insert(AddressRecord::from_raw_multiaddr(address));
}

Self {
peer,
address_store,
address_store: AddressStoreBuckets::from_unknown(addresses),
connection,
key: Key::from(peer),
}
Expand All @@ -285,29 +279,22 @@ impl KademliaPeer {
}

/// Returns the addresses of the peer.
pub fn addresses(&self) -> Vec<Multiaddr> {
pub fn addresses(&self) -> impl Iterator<Item = &Multiaddr> {
self.address_store.addresses(MAX_ADDRESSES)
}
}

impl TryFrom<&schema::kademlia::Peer> for KademliaPeer {
impl TryFrom<schema::kademlia::Peer> for KademliaPeer {
type Error = ();

fn try_from(record: &schema::kademlia::Peer) -> Result<Self, Self::Error> {
fn try_from(record: schema::kademlia::Peer) -> Result<Self, Self::Error> {
let peer = PeerId::from_bytes(&record.id).map_err(|_| ())?;

let mut address_store = AddressStore::new();
for address in record.addrs.iter() {
let Ok(address) = Multiaddr::try_from(address.clone()) else {
continue;
};
address_store.insert(AddressRecord::from_raw_multiaddr(address));
}
let addresses = record.addrs.into_iter().filter_map(|addr| Multiaddr::try_from(addr).ok());

Ok(KademliaPeer {
key: Key::from(peer),
peer,
address_store,
address_store: AddressStoreBuckets::from_unknown(addresses),
connection: ConnectionType::try_from(record.connection)?,
})
}
Expand All @@ -320,7 +307,6 @@ impl From<&KademliaPeer> for schema::kademlia::Peer {
addrs: peer
.address_store
.addresses(MAX_ADDRESSES)
.iter()
.map(|address| address.to_vec())
.collect(),
connection: peer.connection.into(),
Expand Down
110 changes: 109 additions & 1 deletion src/transport/manager/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,20 @@ use crate::{error::DialError, PeerId};
use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;

use std::collections::{hash_map::Entry, HashMap};
use std::collections::{hash_map::Entry, HashMap, HashSet};

/// Maximum number of addresses tracked for a peer.
const MAX_ADDRESSES: usize = 64;

/// Maximum number of addresses tracked for a peer in the success bucket.
const MAX_SUCCESS_ADDRESSES: usize = 32;

/// Maximum number of addresses tracked for a peer in the unknown bucket.
const MAX_UNKNOWN_ADDRESSES: usize = 16;

/// Maximum number of addresses tracked for a peer in the failure bucket.
const MAX_FAILURE_ADDRESSES: usize = 16;

/// Scores for address records.
pub mod scores {
/// Score indicating that the connection was successfully established.
Expand Down Expand Up @@ -261,6 +270,105 @@ impl AddressStore {
}
}

/// Buckets for storing addresses based on dial results.
///
/// This is a more optimized version of [`AddressStore`] that separates addresses
/// based on their dial results (success, unknown, failure).
///
/// It allows for more efficient management of addresses based on their dial outcomes,
/// reducing the need for sorting and filtering during address selection.
#[derive(Debug, Clone, Default)]
pub struct AddressStoreBuckets {
/// Addresses with successful dials.
pub success: HashSet<Multiaddr>,

/// Addresses not yet dialed.
pub unknown: HashSet<Multiaddr>,

/// Addresses with dial failures.
pub failure: HashSet<Multiaddr>,
}

impl AddressStoreBuckets {
/// Create new [`AddressStoreBuckets`].
pub fn new() -> Self {
Self {
success: HashSet::with_capacity(MAX_SUCCESS_ADDRESSES),
unknown: HashSet::with_capacity(MAX_UNKNOWN_ADDRESSES),
failure: HashSet::with_capacity(MAX_FAILURE_ADDRESSES),
}
}

/// Create [`AddressStoreBuckets`] from a set of unknown addresses.
///
/// If the addresses exceed the maximum capacity, they will be truncated.
pub fn from_unknown(addresses: impl IntoIterator<Item = Multiaddr>) -> Self {
let mut store = Self::new();
for address in addresses.into_iter().take(MAX_UNKNOWN_ADDRESSES) {
store.unknown.insert(address);
}
store
}

/// Insert an address record into the appropriate bucket based on its score.
pub fn insert(&mut self, record: AddressRecord) {
let AddressRecord { score, address } = record;

match score {
score if score > 0 => {
// Moves directly to the success bucket.
self.unknown.remove(&address);
self.failure.remove(&address);

Self::ensure_space(&mut self.success);
self.success.insert(address);
}
0 => {
// Moves to the unknown bucket.
self.success.remove(&address);
self.failure.remove(&address);

Self::ensure_space(&mut self.unknown);
self.unknown.insert(address);
}
_ => {
// Moves to the failure bucket.
self.success.remove(&address);
self.unknown.remove(&address);

Self::ensure_space(&mut self.failure);
self.failure.insert(address);
}
}
}

/// Ensure that there is space in the bucket.
fn ensure_space(bucket: &mut HashSet<Multiaddr>) {
if bucket.len() < bucket.capacity() {
return;
}

// Remove the first element to ensure space.
if let Some(first) = bucket.iter().next().cloned() {
bucket.remove(&first);
}
}

/// Check if the store is empty.
pub fn is_empty(&self) -> bool {
self.success.is_empty() && self.unknown.is_empty() && self.failure.is_empty()
}

/// Return the available addresses from all buckets.
pub fn addresses(&self, limit: usize) -> impl Iterator<Item = &Multiaddr> {
self.success
.iter()
.chain(self.unknown.iter())
.chain(self.failure.iter())
.take(limit)
}
}

#[cfg(test)]
mod tests {
use std::{
Expand Down
Loading