fix(network): use correct client state transition for n2n txsub (#348)

This commit is contained in:
Harper 2023-12-15 16:28:36 +00:00 committed by GitHub
parent 408a41a9ea
commit ef086375bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 301 additions and 15 deletions

View file

@ -10,7 +10,7 @@ use tokio::net::UnixListener;
use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable};
use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE;
use crate::miniprotocols::{txsubmission, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_TX_SUBMISSION};
use crate::{
miniprotocols::{
blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC,
@ -38,6 +38,7 @@ pub struct PeerClient {
pub handshake: handshake::Confirmation<handshake::n2n::VersionData>,
pub chainsync: chainsync::N2NClient,
pub blockfetch: blockfetch::Client,
pub txsubmission: txsubmission::Client,
}
impl PeerClient {
@ -49,14 +50,15 @@ impl PeerClient {
let mut plexer = multiplexer::Plexer::new(bearer);
let channel0 = plexer.subscribe_client(0);
let channel2 = plexer.subscribe_client(2);
let channel3 = plexer.subscribe_client(3);
let hs_channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE);
let cs_channel = plexer.subscribe_client(PROTOCOL_N2N_CHAIN_SYNC);
let bf_channel = plexer.subscribe_client(PROTOCOL_N2N_BLOCK_FETCH);
let txsub_channel = plexer.subscribe_client(PROTOCOL_N2N_TX_SUBMISSION);
let plexer_handle = tokio::spawn(async move { plexer.run().await });
let versions = handshake::n2n::VersionTable::v7_and_above(magic);
let mut client = handshake::Client::new(channel0);
let mut client = handshake::Client::new(hs_channel);
let handshake = client
.handshake(versions)
@ -71,8 +73,9 @@ impl PeerClient {
Ok(Self {
plexer_handle,
handshake,
chainsync: chainsync::Client::new(channel2),
blockfetch: blockfetch::Client::new(channel3),
chainsync: chainsync::Client::new(cs_channel),
blockfetch: blockfetch::Client::new(bf_channel),
txsubmission: txsubmission::Client::new(txsub_channel),
})
}
@ -84,6 +87,10 @@ impl PeerClient {
&mut self.blockfetch
}
pub fn txsubmission(&mut self) -> &mut txsubmission::Client {
&mut self.txsubmission
}
pub fn abort(&mut self) {
self.plexer_handle.abort();
}
@ -95,6 +102,7 @@ pub struct PeerServer {
pub version: (VersionNumber, n2n::VersionData),
pub chainsync: chainsync::N2NServer,
pub blockfetch: blockfetch::Server,
pub txsubmission: txsubmission::Server,
}
impl PeerServer {
@ -108,10 +116,12 @@ impl PeerServer {
let hs_channel = server_plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE);
let cs_channel = server_plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC);
let bf_channel = server_plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH);
let txsub_channel = server_plexer.subscribe_server(PROTOCOL_N2N_TX_SUBMISSION);
let mut server_hs: handshake::Server<n2n::VersionData> = handshake::Server::new(hs_channel);
let server_cs = chainsync::N2NServer::new(cs_channel);
let server_bf = blockfetch::Server::new(bf_channel);
let server_txsub = txsubmission::Server::new(txsub_channel);
let plexer_handle = tokio::spawn(async move { server_plexer.run().await });
@ -126,6 +136,7 @@ impl PeerServer {
version: ver,
chainsync: server_cs,
blockfetch: server_bf,
txsubmission: server_txsub,
})
} else {
plexer_handle.abort();
@ -141,6 +152,10 @@ impl PeerServer {
&mut self.blockfetch
}
pub fn txsubmission(&mut self) -> &mut txsubmission::Server {
&mut self.txsubmission
}
pub fn abort(&mut self) {
self.plexer_handle.abort();
}

View file

@ -132,14 +132,16 @@ where
pub async fn next_request(&mut self) -> Result<Request<TxId>, Error> {
match self.recv_message().await? {
Message::RequestTxIds(blocking, ack, req) => {
self.0 = State::TxIdsBlocking;
match blocking {
true => Ok(Request::TxIds(ack, req)),
false => Ok(Request::TxIdsNonBlocking(ack, req)),
Message::RequestTxIds(blocking, ack, req) => match blocking {
true => {
self.0 = State::TxIdsBlocking;
Ok(Request::TxIds(ack, req))
}
}
false => {
self.0 = State::TxIdsNonBlocking;
Ok(Request::TxIdsNonBlocking(ack, req))
}
},
Message::RequestTxs(x) => {
self.0 = State::Txs;
Ok(Request::Txs(x))

View file

@ -10,12 +10,13 @@ use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip
use pallas_network::miniprotocols::handshake::n2n::VersionData;
use pallas_network::miniprotocols::localstate::queries_v16::{Addr, Addrs, Value};
use pallas_network::miniprotocols::localstate::ClientQueryRequest;
use pallas_network::miniprotocols::txsubmission::{EraTxBody, TxIdAndSize};
use pallas_network::miniprotocols::{
blockfetch,
chainsync::{self, NextResponse},
Point,
};
use pallas_network::miniprotocols::{handshake, localstate};
use pallas_network::miniprotocols::{handshake, localstate, txsubmission, MAINNET_MAGIC};
use pallas_network::multiplexer::{Bearer, Plexer};
use std::path::Path;
use tokio::net::{TcpListener, UnixListener};
@ -769,3 +770,270 @@ pub async fn local_state_query_server_and_client_happy_path() {
_ = tokio::join!(client, server);
}
#[tokio::test]
#[ignore]
pub async fn txsubmission_server_and_client_happy_path_n2n() {
let test_txs = vec![(vec![0], vec![0, 0, 0]), (vec![1], vec![1, 1, 1])];
let server = tokio::spawn({
let test_txs = test_txs.clone();
async move {
let server_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001))
.await
.unwrap();
let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap();
let server_txsub = peer_server.txsubmission();
// server waits for init
server_txsub.wait_for_init().await.unwrap();
// server requests some tx ids
server_txsub
.acknowledge_and_request_tx_ids(false, 0, 2)
.await
.unwrap();
assert_eq!(*server_txsub.state(), txsubmission::State::TxIdsNonBlocking);
// server receives tx ids
let txids = match server_txsub.receive_next_reply().await.unwrap() {
txsubmission::Reply::TxIds(x) => x,
_ => panic!("unexpected message"),
};
assert_eq!(*server_txsub.state(), txsubmission::State::Idle);
// server requests txs for ids
let txids: Vec<_> = txids.into_iter().map(|t| t.0).collect();
assert_eq!(txids[0].1, test_txs[0].0);
assert_eq!(txids[1].1, test_txs[1].0);
server_txsub.request_txs(txids).await.unwrap();
assert_eq!(*server_txsub.state(), txsubmission::State::Txs);
// server receives txs
let txs = match server_txsub.receive_next_reply().await.unwrap() {
txsubmission::Reply::Txs(x) => x,
_ => panic!("unexpected message"),
};
assert_eq!(*server_txsub.state(), txsubmission::State::Idle);
assert_eq!(txs[0].1, test_txs[0].1);
assert_eq!(txs[1].1, test_txs[1].1);
// server requests more tx ids (blocking)
server_txsub
.acknowledge_and_request_tx_ids(true, 2, 1)
.await
.unwrap();
assert_eq!(*server_txsub.state(), txsubmission::State::TxIdsBlocking);
// server receives done from client
match server_txsub.receive_next_reply().await.unwrap() {
txsubmission::Reply::Done => (),
_ => panic!("unexpected message"),
}
assert_eq!(*server_txsub.state(), txsubmission::State::Done);
}
});
let client = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(3)).await;
let mut mempool = test_txs.clone();
// client setup
let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap();
let client_txsub = client_to_server_conn.txsubmission();
// send init
client_txsub.send_init().await.unwrap();
assert_eq!(*client_txsub.state(), txsubmission::State::Idle);
// receive ids request from server
let (_, req) = match client_txsub.next_request().await.unwrap() {
txsubmission::Request::TxIdsNonBlocking(ack, req) => (ack, req),
_ => panic!("unexpected message"),
};
assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking);
// send ids to server
let to_send = mempool.drain(..req as usize).collect::<Vec<_>>();
let ids_and_size = to_send
.clone()
.into_iter()
.map(|(h, b)| TxIdAndSize(txsubmission::EraTxId(0, h), b.len() as u32))
.collect();
client_txsub.reply_tx_ids(ids_and_size).await.unwrap();
assert_eq!(*client_txsub.state(), txsubmission::State::Idle);
// receive txs request from server
let ids = match client_txsub.next_request().await.unwrap() {
txsubmission::Request::Txs(ids) => ids,
_ => panic!("unexpected message"),
};
assert_eq!(*client_txsub.state(), txsubmission::State::Txs);
assert_eq!(ids[0].1, test_txs[0].0);
assert_eq!(ids[1].1, test_txs[1].0);
// send txs to server
let txs_to_send: Vec<_> = to_send.into_iter().map(|(_, b)| EraTxBody(0, b)).collect();
client_txsub.reply_txs(txs_to_send).await.unwrap();
assert_eq!(*client_txsub.state(), txsubmission::State::Idle);
// receive tx ids request from server (blocking)
match client_txsub.next_request().await.unwrap() {
txsubmission::Request::TxIds(_, _) => (),
_ => panic!("unexpected message"),
};
assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsBlocking);
// send done to server
client_txsub.send_done().await.unwrap();
assert_eq!(*client_txsub.state(), txsubmission::State::Done);
});
_ = tokio::join!(client, server);
}
#[tokio::test]
#[ignore]
pub async fn txsubmission_submit_to_mainnet_peer_n2n() {
let tx_hash =
hex::decode("8b6e50e09376b5021e93fe688ba9e7100e3682cebcb39970af5f4e5962bc5a3d").unwrap();
let tx_hex = include_str!("../../test_data/babbage11.tx");
let tx_bytes = hex::decode(tx_hex).unwrap();
let mempool = vec![(tx_hash, tx_bytes)];
// client setup
let mut client_to_server_conn =
PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC)
.await
.unwrap();
let client_txsub = client_to_server_conn.txsubmission();
// send init
client_txsub.send_init().await.unwrap();
assert_eq!(*client_txsub.state(), txsubmission::State::Idle);
// receive ids request from server
let ack = match client_txsub.next_request().await.unwrap() {
txsubmission::Request::TxIds(ack, _) => {
assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsBlocking);
ack
}
txsubmission::Request::TxIdsNonBlocking(ack, _) => {
assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking);
ack
}
_ => panic!("unexpected message"),
};
assert_eq!(ack, 0);
// send ids to server
let to_send = mempool.clone();
let ids_and_size = to_send
.clone()
.into_iter()
.map(|(h, b)| TxIdAndSize(txsubmission::EraTxId(4, h), b.len() as u32))
.collect();
client_txsub.reply_tx_ids(ids_and_size).await.unwrap();
assert_eq!(*client_txsub.state(), txsubmission::State::Idle);
// receive txs request from server
let ids = match client_txsub.next_request().await.unwrap() {
txsubmission::Request::Txs(ids) => ids,
_ => panic!("unexpected message"),
};
assert_eq!(*client_txsub.state(), txsubmission::State::Txs);
assert_eq!(ids[0].1, mempool[0].0);
// send txs to server
let txs_to_send: Vec<_> = to_send.into_iter().map(|(_, b)| EraTxBody(4, b)).collect();
client_txsub.reply_txs(txs_to_send).await.unwrap();
assert_eq!(*client_txsub.state(), txsubmission::State::Idle);
// receive tx ids request from server (blocking)
// server usually sends another request before processing/acknowledging our
// previous response, so ack is 0. the ack comes in the next message.
match client_txsub.next_request().await.unwrap() {
txsubmission::Request::TxIdsNonBlocking(_, _) => {
assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking);
}
_ => panic!("unexpected message"),
};
client_txsub.reply_tx_ids(vec![]).await.unwrap();
let ack = match client_txsub.next_request().await.unwrap() {
txsubmission::Request::TxIds(ack, _) => {
assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsBlocking);
client_txsub.send_done().await.unwrap();
assert_eq!(*client_txsub.state(), txsubmission::State::Done);
ack
}
txsubmission::Request::TxIdsNonBlocking(ack, _) => {
assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking);
ack
}
_ => panic!("unexpected message"),
};
// server should acknowledge the one transaction we sent now
assert_eq!(ack, 1);
}

1
test_data/babbage11.tx Normal file
View file

@ -0,0 +1 @@
84a5008182582030edb55f21419f693940372b080fa931d3f7340f9d362573d1b8a20bcc7f208000018182583901c04c6e21cc83f7322439d5cd6950bd36d114da35859fe01fbb31da1f58646661658b029b6906bd3a5b35150cf1b274cbdffd2f504119eb9f1a009b83c1021a000618f0031a20b40da0048183028200581c58646661658b029b6906bd3a5b35150cf1b274cbdffd2f504119eb9f581cae66e56ab11ccb39e882669f220a37956c683e4ce84fefd910012d7aa1008282582080d6e8a2928c2b54cfe94a607c246de2062dc32fd49bd7e522cc210fe581c45b58404e17fe684a23f53e30abc9024f8333419e8eba5dce149dd8d7d0e641b8e05d893a48202c05bc23c983a15b17f1fc9641b6ebb544e3d787518150f2e6e097990a8258202e4103fac5c3fbd804802647dc1c6c9a3610584ba73a77f70dc5a4f962334a325840635471d189d510d6e8d18f28ec73d9cc191f86e625610710ed1695d484397bd32e0b67078ce70daea328c065af1e887a17bd08950623504b29b2e82c804d7607f5f6