Skip to content

Repeat ping request periodically #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/protocol/libp2p/ping/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::time::Duration;
use crate::{
codec::ProtocolCodec, protocol::libp2p::ping::PingEvent, types::protocol::ProtocolName,
DEFAULT_CHANNEL_SIZE,
Expand All @@ -36,6 +37,8 @@ const PING_PAYLOAD_SIZE: usize = 32;
/// Maximum PING failures.
const MAX_FAILURES: usize = 3;

pub const PING_INTERVAL: Duration = Duration::from_secs(15);

/// Ping configuration.
pub struct Config {
/// Protocol name.
Expand All @@ -49,6 +52,8 @@ pub struct Config {

/// TX channel for sending events to the user protocol.
pub(crate) tx_event: Sender<PingEvent>,

pub(crate) ping_interval: Duration,
}

impl Config {
Expand All @@ -61,6 +66,7 @@ impl Config {
(
Self {
tx_event,
ping_interval: PING_INTERVAL,
max_failures: MAX_FAILURES,
protocol: ProtocolName::from(PROTOCOL_NAME),
codec: ProtocolCodec::Identity(PING_PAYLOAD_SIZE),
Expand All @@ -80,6 +86,7 @@ pub struct ConfigBuilder {

/// Maximum failures before the peer is considered unreachable.
max_failures: usize,
ping_interval: Duration,
}

impl Default for ConfigBuilder {
Expand All @@ -92,6 +99,7 @@ impl ConfigBuilder {
/// Create new default [`Config`] which can be modified by the user.
pub fn new() -> Self {
Self {
ping_interval: PING_INTERVAL,
max_failures: MAX_FAILURES,
protocol: ProtocolName::from(PROTOCOL_NAME),
codec: ProtocolCodec::Identity(PING_PAYLOAD_SIZE),
Expand All @@ -104,13 +112,19 @@ impl ConfigBuilder {
self
}

pub fn with_ping_interval(mut self, ping_interval: Duration) -> Self {
self.ping_interval = ping_interval;
self
}

/// Build [`Config`].
pub fn build(self) -> (Config, Box<dyn Stream<Item = PingEvent> + Send + Unpin>) {
let (tx_event, rx_event) = channel(DEFAULT_CHANNEL_SIZE);

(
Config {
tx_event,
ping_interval: self.ping_interval,
max_failures: self.max_failures,
protocol: self.protocol,
codec: self.codec,
Expand Down
20 changes: 18 additions & 2 deletions src/protocol/libp2p/ping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,16 @@ pub(crate) struct Ping {

/// Pending inbound substreams.
pending_inbound: FuturesUnordered<BoxFuture<'static, crate::Result<()>>>,

ping_interval: Duration,
}

impl Ping {
/// Create new [`Ping`] protocol.
pub fn new(service: TransportService, config: Config) -> Self {
Self {
service,
ping_interval: config.ping_interval,
tx: config.tx_event,
peers: HashSet::new(),
pending_outbound: FuturesUnordered::new(),
Expand All @@ -96,7 +99,6 @@ impl Ping {
fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
tracing::trace!(target: LOG_TARGET, ?peer, "connection established");

self.service.open_substream(peer)?;
self.peers.insert(peer);

Ok(())
Expand Down Expand Up @@ -166,12 +168,13 @@ impl Ping {
/// Start [`Ping`] event loop.
pub async fn run(mut self) {
tracing::debug!(target: LOG_TARGET, "starting ping event loop");
let mut interval = tokio::time::interval(self.ping_interval);

loop {
tokio::select! {
event = self.service.next() => match event {
Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
let _ = self.on_connection_established(peer);
self.on_connection_established(peer);
}
Some(TransportEvent::ConnectionClosed { peer }) => {
self.on_connection_closed(peer);
Expand All @@ -192,6 +195,19 @@ impl Ping {
Some(_) => {}
None => return,
},
_ = interval.tick() => {
for peer in &self.peers {
tracing::trace!(target: LOG_TARGET, ?peer, "sending ping");
if let Err(error) = self.service.open_substream(*peer) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The libp2p ping protocol spec states that the repeated pings should go on the same substream: https://github.com/libp2p/specs/blob/master/ping/ping.md

Because of this, the issue is actually more difficult due to the need to not keep the connection alive by this ping substream.

I don't think we should break the spec even if it works with the current libp2p implementation. @lexnv what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I would comply with the libp2p spec as much as possible. The main issue would be that we cannot distinguish ping susbtreams for the keep alive connection tracker 🤔

Are we entirely convinced we need to implement this for ping?

  • we'll need to adjust the substream API to expose that the substream is ping (accounting towards the keepalive)
  • how do we treat failures here? If we cannot open a substream should we terminate the connection? If so, we need to propagate again to the higher levels the signal to terminate the connection
    • offhand, from ping we'll need to propagate towards the transport manager, and from the transport manager we'll need to propagate to individual connections

tracing::debug!(
target: LOG_TARGET,
?peer,
?error,
"failed to open substream for ping"
);
}
}
}
_event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => {
match event {
Expand Down
Loading