From ef086375bf768c061e3b32139c71be875445665b Mon Sep 17 00:00:00 2001 From: Harper Date: Fri, 15 Dec 2023 16:28:36 +0000 Subject: [PATCH] fix(network): use correct client state transition for n2n txsub (#348) --- pallas-network/src/facades.rs | 29 +- .../src/miniprotocols/txsubmission/client.rs | 16 +- pallas-network/tests/protocols.rs | 270 +++++++++++++++++- test_data/babbage11.tx | 1 + 4 files changed, 301 insertions(+), 15 deletions(-) create mode 100644 test_data/babbage11.tx diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 8ba4f36..ddbee18 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -10,7 +10,7 @@ use tokio::net::UnixListener; use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable}; -use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE; +use crate::miniprotocols::{txsubmission, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_TX_SUBMISSION}; use crate::{ miniprotocols::{ blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC, @@ -38,6 +38,7 @@ pub struct PeerClient { pub handshake: handshake::Confirmation, pub chainsync: chainsync::N2NClient, pub blockfetch: blockfetch::Client, + pub txsubmission: txsubmission::Client, } impl PeerClient { @@ -49,14 +50,15 @@ impl PeerClient { let mut plexer = multiplexer::Plexer::new(bearer); - let channel0 = plexer.subscribe_client(0); - let channel2 = plexer.subscribe_client(2); - let channel3 = plexer.subscribe_client(3); + let hs_channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE); + let cs_channel = plexer.subscribe_client(PROTOCOL_N2N_CHAIN_SYNC); + let bf_channel = plexer.subscribe_client(PROTOCOL_N2N_BLOCK_FETCH); + let txsub_channel = plexer.subscribe_client(PROTOCOL_N2N_TX_SUBMISSION); let plexer_handle = tokio::spawn(async move { plexer.run().await }); let versions = handshake::n2n::VersionTable::v7_and_above(magic); - let mut client = handshake::Client::new(channel0); + let mut client = handshake::Client::new(hs_channel); let handshake = client .handshake(versions) @@ -71,8 +73,9 @@ impl PeerClient { Ok(Self { plexer_handle, handshake, - chainsync: chainsync::Client::new(channel2), - blockfetch: blockfetch::Client::new(channel3), + chainsync: chainsync::Client::new(cs_channel), + blockfetch: blockfetch::Client::new(bf_channel), + txsubmission: txsubmission::Client::new(txsub_channel), }) } @@ -84,6 +87,10 @@ impl PeerClient { &mut self.blockfetch } + pub fn txsubmission(&mut self) -> &mut txsubmission::Client { + &mut self.txsubmission + } + pub fn abort(&mut self) { self.plexer_handle.abort(); } @@ -95,6 +102,7 @@ pub struct PeerServer { pub version: (VersionNumber, n2n::VersionData), pub chainsync: chainsync::N2NServer, pub blockfetch: blockfetch::Server, + pub txsubmission: txsubmission::Server, } impl PeerServer { @@ -108,10 +116,12 @@ impl PeerServer { let hs_channel = server_plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE); let cs_channel = server_plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC); let bf_channel = server_plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH); + let txsub_channel = server_plexer.subscribe_server(PROTOCOL_N2N_TX_SUBMISSION); let mut server_hs: handshake::Server = handshake::Server::new(hs_channel); let server_cs = chainsync::N2NServer::new(cs_channel); let server_bf = blockfetch::Server::new(bf_channel); + let server_txsub = txsubmission::Server::new(txsub_channel); let plexer_handle = tokio::spawn(async move { server_plexer.run().await }); @@ -126,6 +136,7 @@ impl PeerServer { version: ver, chainsync: server_cs, blockfetch: server_bf, + txsubmission: server_txsub, }) } else { plexer_handle.abort(); @@ -141,6 +152,10 @@ impl PeerServer { &mut self.blockfetch } + pub fn txsubmission(&mut self) -> &mut txsubmission::Server { + &mut self.txsubmission + } + pub fn abort(&mut self) { self.plexer_handle.abort(); } diff --git a/pallas-network/src/miniprotocols/txsubmission/client.rs b/pallas-network/src/miniprotocols/txsubmission/client.rs index c669660..1dea3cd 100644 --- a/pallas-network/src/miniprotocols/txsubmission/client.rs +++ b/pallas-network/src/miniprotocols/txsubmission/client.rs @@ -132,14 +132,16 @@ where pub async fn next_request(&mut self) -> Result, Error> { match self.recv_message().await? { - Message::RequestTxIds(blocking, ack, req) => { - self.0 = State::TxIdsBlocking; - - match blocking { - true => Ok(Request::TxIds(ack, req)), - false => Ok(Request::TxIdsNonBlocking(ack, req)), + Message::RequestTxIds(blocking, ack, req) => match blocking { + true => { + self.0 = State::TxIdsBlocking; + Ok(Request::TxIds(ack, req)) } - } + false => { + self.0 = State::TxIdsNonBlocking; + Ok(Request::TxIdsNonBlocking(ack, req)) + } + }, Message::RequestTxs(x) => { self.0 = State::Txs; Ok(Request::Txs(x)) diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index bc1970d..576566e 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -10,12 +10,13 @@ use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip use pallas_network::miniprotocols::handshake::n2n::VersionData; use pallas_network::miniprotocols::localstate::queries_v16::{Addr, Addrs, Value}; use pallas_network::miniprotocols::localstate::ClientQueryRequest; +use pallas_network::miniprotocols::txsubmission::{EraTxBody, TxIdAndSize}; use pallas_network::miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, Point, }; -use pallas_network::miniprotocols::{handshake, localstate}; +use pallas_network::miniprotocols::{handshake, localstate, txsubmission, MAINNET_MAGIC}; use pallas_network::multiplexer::{Bearer, Plexer}; use std::path::Path; use tokio::net::{TcpListener, UnixListener}; @@ -769,3 +770,270 @@ pub async fn local_state_query_server_and_client_happy_path() { _ = tokio::join!(client, server); } + +#[tokio::test] +#[ignore] +pub async fn txsubmission_server_and_client_happy_path_n2n() { + let test_txs = vec![(vec![0], vec![0, 0, 0]), (vec![1], vec![1, 1, 1])]; + + let server = tokio::spawn({ + let test_txs = test_txs.clone(); + async move { + let server_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)) + .await + .unwrap(); + + let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap(); + + let server_txsub = peer_server.txsubmission(); + + // server waits for init + + server_txsub.wait_for_init().await.unwrap(); + + // server requests some tx ids + + server_txsub + .acknowledge_and_request_tx_ids(false, 0, 2) + .await + .unwrap(); + + assert_eq!(*server_txsub.state(), txsubmission::State::TxIdsNonBlocking); + + // server receives tx ids + + let txids = match server_txsub.receive_next_reply().await.unwrap() { + txsubmission::Reply::TxIds(x) => x, + _ => panic!("unexpected message"), + }; + + assert_eq!(*server_txsub.state(), txsubmission::State::Idle); + + // server requests txs for ids + + let txids: Vec<_> = txids.into_iter().map(|t| t.0).collect(); + + assert_eq!(txids[0].1, test_txs[0].0); + assert_eq!(txids[1].1, test_txs[1].0); + + server_txsub.request_txs(txids).await.unwrap(); + + assert_eq!(*server_txsub.state(), txsubmission::State::Txs); + + // server receives txs + + let txs = match server_txsub.receive_next_reply().await.unwrap() { + txsubmission::Reply::Txs(x) => x, + _ => panic!("unexpected message"), + }; + + assert_eq!(*server_txsub.state(), txsubmission::State::Idle); + + assert_eq!(txs[0].1, test_txs[0].1); + assert_eq!(txs[1].1, test_txs[1].1); + + // server requests more tx ids (blocking) + + server_txsub + .acknowledge_and_request_tx_ids(true, 2, 1) + .await + .unwrap(); + + assert_eq!(*server_txsub.state(), txsubmission::State::TxIdsBlocking); + + // server receives done from client + + match server_txsub.receive_next_reply().await.unwrap() { + txsubmission::Reply::Done => (), + _ => panic!("unexpected message"), + } + + assert_eq!(*server_txsub.state(), txsubmission::State::Done); + } + }); + + let client = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(3)).await; + let mut mempool = test_txs.clone(); + + // client setup + + let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap(); + + let client_txsub = client_to_server_conn.txsubmission(); + + // send init + + client_txsub.send_init().await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive ids request from server + + let (_, req) = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIdsNonBlocking(ack, req) => (ack, req), + _ => panic!("unexpected message"), + }; + + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking); + + // send ids to server + + let to_send = mempool.drain(..req as usize).collect::>(); + + let ids_and_size = to_send + .clone() + .into_iter() + .map(|(h, b)| TxIdAndSize(txsubmission::EraTxId(0, h), b.len() as u32)) + .collect(); + + client_txsub.reply_tx_ids(ids_and_size).await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive txs request from server + + let ids = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::Txs(ids) => ids, + _ => panic!("unexpected message"), + }; + + assert_eq!(*client_txsub.state(), txsubmission::State::Txs); + + assert_eq!(ids[0].1, test_txs[0].0); + assert_eq!(ids[1].1, test_txs[1].0); + + // send txs to server + + let txs_to_send: Vec<_> = to_send.into_iter().map(|(_, b)| EraTxBody(0, b)).collect(); + + client_txsub.reply_txs(txs_to_send).await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive tx ids request from server (blocking) + + match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIds(_, _) => (), + _ => panic!("unexpected message"), + }; + + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsBlocking); + + // send done to server + + client_txsub.send_done().await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Done); + }); + + _ = tokio::join!(client, server); +} + +#[tokio::test] +#[ignore] +pub async fn txsubmission_submit_to_mainnet_peer_n2n() { + let tx_hash = + hex::decode("8b6e50e09376b5021e93fe688ba9e7100e3682cebcb39970af5f4e5962bc5a3d").unwrap(); + let tx_hex = include_str!("../../test_data/babbage11.tx"); + let tx_bytes = hex::decode(tx_hex).unwrap(); + + let mempool = vec![(tx_hash, tx_bytes)]; + + // client setup + + let mut client_to_server_conn = + PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC) + .await + .unwrap(); + + let client_txsub = client_to_server_conn.txsubmission(); + + // send init + + client_txsub.send_init().await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive ids request from server + + let ack = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIds(ack, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsBlocking); + ack + } + txsubmission::Request::TxIdsNonBlocking(ack, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking); + ack + } + _ => panic!("unexpected message"), + }; + + assert_eq!(ack, 0); + + // send ids to server + + let to_send = mempool.clone(); + + let ids_and_size = to_send + .clone() + .into_iter() + .map(|(h, b)| TxIdAndSize(txsubmission::EraTxId(4, h), b.len() as u32)) + .collect(); + + client_txsub.reply_tx_ids(ids_and_size).await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive txs request from server + + let ids = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::Txs(ids) => ids, + _ => panic!("unexpected message"), + }; + + assert_eq!(*client_txsub.state(), txsubmission::State::Txs); + + assert_eq!(ids[0].1, mempool[0].0); + + // send txs to server + + let txs_to_send: Vec<_> = to_send.into_iter().map(|(_, b)| EraTxBody(4, b)).collect(); + + client_txsub.reply_txs(txs_to_send).await.unwrap(); + + assert_eq!(*client_txsub.state(), txsubmission::State::Idle); + + // receive tx ids request from server (blocking) + + // server usually sends another request before processing/acknowledging our + // previous response, so ack is 0. the ack comes in the next message. + match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIdsNonBlocking(_, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking); + } + _ => panic!("unexpected message"), + }; + + client_txsub.reply_tx_ids(vec![]).await.unwrap(); + + let ack = match client_txsub.next_request().await.unwrap() { + txsubmission::Request::TxIds(ack, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsBlocking); + + client_txsub.send_done().await.unwrap(); + assert_eq!(*client_txsub.state(), txsubmission::State::Done); + + ack + } + txsubmission::Request::TxIdsNonBlocking(ack, _) => { + assert_eq!(*client_txsub.state(), txsubmission::State::TxIdsNonBlocking); + + ack + } + _ => panic!("unexpected message"), + }; + + // server should acknowledge the one transaction we sent now + assert_eq!(ack, 1); +} diff --git a/test_data/babbage11.tx b/test_data/babbage11.tx new file mode 100644 index 0000000..71c7045 --- /dev/null +++ b/test_data/babbage11.tx @@ -0,0 +1 @@ +84a5008182582030edb55f21419f693940372b080fa931d3f7340f9d362573d1b8a20bcc7f208000018182583901c04c6e21cc83f7322439d5cd6950bd36d114da35859fe01fbb31da1f58646661658b029b6906bd3a5b35150cf1b274cbdffd2f504119eb9f1a009b83c1021a000618f0031a20b40da0048183028200581c58646661658b029b6906bd3a5b35150cf1b274cbdffd2f504119eb9f581cae66e56ab11ccb39e882669f220a37956c683e4ce84fefd910012d7aa1008282582080d6e8a2928c2b54cfe94a607c246de2062dc32fd49bd7e522cc210fe581c45b58404e17fe684a23f53e30abc9024f8333419e8eba5dce149dd8d7d0e641b8e05d893a48202c05bc23c983a15b17f1fc9641b6ebb544e3d787518150f2e6e097990a8258202e4103fac5c3fbd804802647dc1c6c9a3610584ba73a77f70dc5a4f962334a325840635471d189d510d6e8d18f28ec73d9cc191f86e625610710ed1695d484397bd32e0b67078ce70daea328c065af1e887a17bd08950623504b29b2e82c804d7607f5f6 \ No newline at end of file