Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ use libp2p::{
tcp, yamux,
};
use multiaddr::Protocol;
use std::{collections::HashMap, time::Duration};
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

pub use libp2p::{Multiaddr, StreamProtocol};

const DEFAULT_MAX_PEER_COUNT: u32 = 50;
const DEFAULT_PEER_RETRY_INTERVAL: Duration = Duration::from_secs(60);

/// A message that can be sent between peers.
pub trait Message:
Expand Down Expand Up @@ -119,6 +123,10 @@ impl<M: Message + 'static> Node<M> {
.wrap_err("swarm failed to listen on multiaddr")?;
}

let mut known_peers_info = Vec::new();
let mut retry_interval = tokio::time::interval(DEFAULT_PEER_RETRY_INTERVAL);
retry_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

for mut address in known_peers {
let peer_id = match address.pop() {
Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
Expand All @@ -128,8 +136,9 @@ impl<M: Message + 'static> Node<M> {
};
swarm.add_peer_address(peer_id, address.clone());
swarm
.dial(address)
.dial(address.clone())
.wrap_err("swarm failed to dial known peer")?;
known_peers_info.push((peer_id, address));
}

let handles = incoming_streams_handlers
Expand All @@ -145,6 +154,19 @@ impl<M: Message + 'static> Node<M> {
handles.into_iter().for_each(|h| h.abort());
break Ok(());
}
_ = retry_interval.tick() => {
// Check for disconnected known peers and retry connection
let connected_peers: HashSet<PeerId> = swarm.connected_peers().copied().collect();
for (peer_id, address) in &known_peers_info {
if !connected_peers.contains(peer_id) {
debug!("retrying connection to disconnected known peer {peer_id} at {address}");
swarm.add_peer_address(*peer_id, address.clone());
if let Err(e) = swarm.dial(address.clone()) {
warn!("failed to retry dial to known peer {peer_id} at {address}: {e:?}");
}
}
}
}
Some(message) = outgoing_message_rx.recv() => {
let protocol = message.protocol();
debug!("received message to broadcast on protocol {protocol}");
Expand Down