fix(network): add tcp_nodelay to bearer (#365)
This commit is contained in:
parent
1ed2161a43
commit
d1e40907c6
4 changed files with 167 additions and 46 deletions
|
|
@ -11,6 +11,8 @@ pallas = { path = "../../pallas" }
|
|||
net2 = "0.2.37"
|
||||
hex = "0.4.3"
|
||||
log = "0.4.16"
|
||||
thiserror = "1.0.31"
|
||||
futures = "0.3.29"
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = "0.3.16"
|
||||
tokio = { version = "1.27.0", features = ["rt-multi-thread"] }
|
||||
|
|
|
|||
|
|
@ -1,77 +1,186 @@
|
|||
use pallas::network::{
|
||||
use pallas::{network::{
|
||||
facades::PeerClient,
|
||||
miniprotocols::{chainsync, Point, MAINNET_MAGIC},
|
||||
};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
miniprotocols::{chainsync, Point, MAINNET_MAGIC, blockfetch, keepalive},
|
||||
}, ledger::traverse::MultiEraHeader};
|
||||
use tokio::{time::Instant, select};
|
||||
use thiserror::Error;
|
||||
use futures::{future::FutureExt, pin_mut};
|
||||
|
||||
async fn do_blockfetch(peer: &mut PeerClient) {
|
||||
let range = (
|
||||
Point::Specific(
|
||||
43847831,
|
||||
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")
|
||||
.unwrap(),
|
||||
),
|
||||
Point::Specific(
|
||||
43847844,
|
||||
hex::decode("ff8d558a3d5a0e058beb3d94d26a567f75cd7d09ff5485aa0d0ebc38b61378d4")
|
||||
.unwrap(),
|
||||
),
|
||||
);
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("hex conversion error")]
|
||||
FromHexError(#[from] hex::FromHexError),
|
||||
|
||||
let blocks = peer.blockfetch().fetch_range(range).await.unwrap();
|
||||
#[error("blockfetch error")]
|
||||
BlockFetchError(#[from] blockfetch::ClientError),
|
||||
|
||||
for block in blocks {
|
||||
info!("received block of size: {}", block.len());
|
||||
}
|
||||
#[error("chainsync error")]
|
||||
ChainSyncError(#[from] chainsync::ClientError),
|
||||
|
||||
#[error("keepalive error")]
|
||||
KeepAliveError(#[from] keepalive::Error),
|
||||
|
||||
#[error("pallas_traverse error")]
|
||||
PallasTraverseError(#[from] pallas::ledger::traverse::Error),
|
||||
}
|
||||
|
||||
async fn do_chainsync(peer: &mut PeerClient) {
|
||||
async fn do_blockfetch(blockfetch_client: &mut blockfetch::Client, range: (Point, Point)) -> Result<(), Error> {
|
||||
let blocks = blockfetch_client.fetch_range(range.clone()).await?;
|
||||
|
||||
for block in &blocks {
|
||||
tracing::trace!("received block of size: {}", block.len());
|
||||
}
|
||||
tracing::info!("received {} blocks. last slot: {}", blocks.len(), range.1.slot_or_default());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_client: &mut blockfetch::Client) -> Result<(), Error> {
|
||||
let known_points = vec![Point::Specific(
|
||||
43847831u64,
|
||||
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
|
||||
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")?,
|
||||
)];
|
||||
|
||||
let (point, _) = peer.chainsync().find_intersect(known_points).await.unwrap();
|
||||
let (point, _) = chainsync_client.find_intersect(known_points).await?;
|
||||
|
||||
info!("intersected point is {:?}", point);
|
||||
tracing::info!("intersected point is {:?}", point);
|
||||
|
||||
let mut keepalive_timer = Instant::now();
|
||||
for _ in 0..10 {
|
||||
if keepalive_timer.elapsed().as_secs() > 20 {
|
||||
peer.keepalive().send_keepalive().await.unwrap();
|
||||
keepalive_timer = Instant::now();
|
||||
}
|
||||
let next = peer.chainsync().request_next().await.unwrap();
|
||||
let mut block_count = 0u16;
|
||||
let mut start_point = Point::Specific(0, vec![]);
|
||||
let mut end_point: Point;
|
||||
let mut next_log = Instant::now();
|
||||
loop {
|
||||
let next = chainsync_client.request_next().await?;
|
||||
|
||||
match next {
|
||||
chainsync::NextResponse::RollForward(h, _) => {
|
||||
log::info!("rolling forward, header size: {}", h.cbor.len())
|
||||
tracing::trace!("rolling forward, header size: {}", h.cbor.len());
|
||||
let point = match h.byron_prefix {
|
||||
None => {
|
||||
let multi_era_header = MultiEraHeader::decode(h.variant, None, &h.cbor)?;
|
||||
let slot = multi_era_header.slot();
|
||||
let hash = multi_era_header.hash().to_vec();
|
||||
let number = multi_era_header.number();
|
||||
match &multi_era_header {
|
||||
MultiEraHeader::EpochBoundary(_) => {
|
||||
tracing::info!("epoch boundary");
|
||||
None
|
||||
},
|
||||
MultiEraHeader::AlonzoCompatible(_) | MultiEraHeader::Babbage(_) => {
|
||||
if next_log.elapsed().as_secs() > 1 {
|
||||
tracing::info!("chainsync block header: {}", number);
|
||||
next_log = Instant::now();
|
||||
}
|
||||
Some(Point::Specific(slot, hash))
|
||||
},
|
||||
MultiEraHeader::Byron(_) => {
|
||||
tracing::info!("ignoring byron header");
|
||||
None
|
||||
},
|
||||
}
|
||||
}
|
||||
Some(_) => {
|
||||
tracing::info!("skipping byron block");
|
||||
None
|
||||
}
|
||||
};
|
||||
match point {
|
||||
Some(p) => {
|
||||
block_count += 1;
|
||||
if block_count == 1 {
|
||||
start_point = p;
|
||||
}
|
||||
else if block_count == 10 {
|
||||
end_point = p;
|
||||
do_blockfetch(blockfetch_client, (start_point.clone(), end_point.clone())).await?;
|
||||
block_count = 0;
|
||||
}
|
||||
},
|
||||
None => {},
|
||||
};
|
||||
}
|
||||
chainsync::NextResponse::RollBackward(x, _) => log::info!("rollback to {:?}", x),
|
||||
chainsync::NextResponse::Await => log::info!("tip of chaing reached"),
|
||||
chainsync::NextResponse::Await => tracing::info!("tip of chaing reached"),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_keepalive(keepalive_client: &mut keepalive::Client) -> Result<(), Error> {
|
||||
let mut keepalive_timer = Instant::now();
|
||||
loop {
|
||||
if keepalive_timer.elapsed().as_secs() > 20 {
|
||||
tracing::info!("sending keepalive...");
|
||||
keepalive_client.send_keepalive().await?;
|
||||
tracing::info!("keepalive sent");
|
||||
keepalive_timer = Instant::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing::subscriber::set_global_default(
|
||||
tracing_subscriber::FmtSubscriber::builder()
|
||||
.with_max_level(tracing::Level::TRACE)
|
||||
.with_max_level(tracing::Level::INFO)
|
||||
.finish(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// setup a TCP socket to act as data bearer between our agents and the remote
|
||||
// relay.
|
||||
let mut peer = PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC)
|
||||
.await
|
||||
.unwrap();
|
||||
loop {
|
||||
// setup a TCP socket to act as data bearer between our agents and the remote
|
||||
// relay.
|
||||
let server = "backbone.cardano-mainnet.iohk.io:3001";
|
||||
// let server = "localhost:6000";
|
||||
let mut peer = PeerClient::connect(server, MAINNET_MAGIC)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// fetch an arbitrary batch of block
|
||||
do_blockfetch(&mut peer).await;
|
||||
let chainsync_handle = tokio::spawn(async move {
|
||||
do_chainsync(&mut peer.chainsync, &mut peer.blockfetch).await?;
|
||||
Ok::<(), Error>(())
|
||||
}).fuse();
|
||||
let keepalive_handle = tokio::spawn(async move {
|
||||
do_keepalive(&mut peer.keepalive).await?;
|
||||
Ok::<(), Error>(())
|
||||
}).fuse();
|
||||
|
||||
// execute the chainsync flow from an arbitrary point in the chain
|
||||
do_chainsync(&mut peer).await;
|
||||
pin_mut!(chainsync_handle, keepalive_handle);
|
||||
|
||||
// If any of these concurrent tasks exit or fail, the others are canceled.
|
||||
select! {
|
||||
chainsync_result = chainsync_handle => {
|
||||
match chainsync_result {
|
||||
Ok(result) => {
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(error) => {
|
||||
tracing::error!("chainsync error: {:?}", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::error!("chainsync error: {:?}", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
keepalive_result = keepalive_handle => {
|
||||
match keepalive_result {
|
||||
Ok(result) => {
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(error) => {
|
||||
tracing::error!("keepalive error: {:?}", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::error!("keepalive error: {:?}", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
peer.plexer_handle.abort();
|
||||
|
||||
tracing::info!("waiting 10 seconds before reconnecting...");
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ itertools = "0.10.5"
|
|||
pallas-codec = { version = "=0.20.0", path = "../pallas-codec" }
|
||||
pallas-crypto = { version = "=0.20.0", path = "../pallas-crypto" }
|
||||
rand = "0.8.5"
|
||||
socket2 = "0.5.5"
|
||||
thiserror = "1.0.31"
|
||||
tokio = { version = "1", features = ["rt", "net", "io-util", "time", "sync", "macros"] }
|
||||
tracing = "0.1.37"
|
||||
|
|
|
|||
|
|
@ -77,6 +77,15 @@ const BUFFER_LEN: usize = 1024 * 10;
|
|||
impl Bearer {
|
||||
pub async fn connect_tcp(addr: impl ToSocketAddrs) -> Result<Self, tokio::io::Error> {
|
||||
let stream = TcpStream::connect(addr).await?;
|
||||
// add tcp_keepalive
|
||||
let sock_ref = socket2::SockRef::from(&stream);
|
||||
let mut tcp_keepalive = socket2::TcpKeepalive::new();
|
||||
tcp_keepalive = tcp_keepalive.with_time(tokio::time::Duration::from_secs(20));
|
||||
tcp_keepalive = tcp_keepalive.with_interval(tokio::time::Duration::from_secs(20));
|
||||
let _ = sock_ref.set_tcp_keepalive(&tcp_keepalive);
|
||||
// add tcp_nodelay
|
||||
let _ = sock_ref.set_nodelay(true);
|
||||
|
||||
Ok(Self::Tcp(stream))
|
||||
}
|
||||
|
||||
|
|
@ -353,7 +362,7 @@ impl Plexer {
|
|||
clock: Instant::now(),
|
||||
bearer: SegmentBuffer::new(bearer),
|
||||
ingress: tokio::sync::mpsc::channel(100), // TODO: define buffer
|
||||
egress: tokio::sync::broadcast::channel(100),
|
||||
egress: tokio::sync::broadcast::channel(100000),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue