From fc2728639fe3fbb95ed98ce0dd4eea1dbcf5edfe Mon Sep 17 00:00:00 2001 From: Pi Lanningham Date: Mon, 20 Feb 2023 09:19:06 -0600 Subject: [PATCH] feat: Make the underlying TxBody type generic * WIP * Fix compilation on windows machines * Make TxBody generic Technically we should be generic over TxBody for arbitrary ouroboros implementations; however, that makes things awkward. So, we introduce GenericClient and GenericServer, with concrete types that instantiate them to Cardano specific types. We could have done this with default type arguments, but this pushes the type system to it's limits and it often can't infer the correct type * More examples tweaks; clippy and fmt * Remove unneccesary defaults * Tag 24 is no longer mysterious It means raw CBOR * Cargo fmt One day I'll configure vscode to do this on safe --- examples/n2c-miniprotocols/src/main.rs | 55 +++++++++++-------- .../src/txsubmission/client.rs | 37 +++++++++---- .../src/txsubmission/codec.rs | 37 +++++++++---- .../src/txsubmission/protocol.rs | 18 ++---- .../src/txsubmission/server.rs | 41 ++++++++++---- pallas-miniprotocols/tests/integration.rs | 4 +- 6 files changed, 121 insertions(+), 71 deletions(-) diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 36182de..f40a140 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -1,14 +1,12 @@ use pallas::network::{ - miniprotocols::{ - chainsync, handshake, localstate, Point, MAINNET_MAGIC, PROTOCOL_N2C_CHAIN_SYNC, - PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, - }, - multiplexer::{self, bearers::Bearer}, + miniprotocols::{chainsync, handshake, localstate, Point, MAINNET_MAGIC}, + multiplexer, }; #[derive(Debug)] struct LoggingObserver; +#[allow(dead_code)] fn do_handshake(channel: multiplexer::StdChannel) { let mut client = handshake::N2CClient::new(channel); @@ -26,6 +24,7 @@ fn do_handshake(channel: multiplexer::StdChannel) { } } +#[allow(dead_code)] fn do_localstate_query(channel: multiplexer::StdChannel) { let mut client = localstate::ClientV10::new(channel); client.acquire(None).unwrap(); @@ -37,6 +36,7 @@ fn do_localstate_query(channel: multiplexer::StdChannel) { log::info!("system start result: {:?}", result); } +#[allow(dead_code)] fn do_chainsync(channel: multiplexer::StdChannel) { let known_points = vec![Point::Specific( 43847831u64, @@ -67,30 +67,39 @@ fn main() { .filter_level(log::LevelFilter::Trace) .init(); + #[cfg(not(target_family = "unix"))] + { + panic!("can't use n2c unix socket on non-unix systems"); + } // we connect to the unix socket of the local node. Make sure you have the right // path for your environment #[cfg(target_family = "unix")] - let bearer = Bearer::connect_unix("/tmp/node.socket").unwrap(); + { + use pallas::network::{ + miniprotocols::{ + PROTOCOL_N2C_CHAIN_SYNC, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, + }, + multiplexer::bearers::Bearer, + }; + let bearer = Bearer::connect_unix("/tmp/node.socket").unwrap(); - #[cfg(not(target_family = "unix"))] - panic!("can't use n2c unix socket on non-unix systems"); + // setup the multiplexer by specifying the bearer and the IDs of the + // miniprotocols to use + let mut plexer = multiplexer::StdPlexer::new(bearer); + let handshake = plexer.use_client_channel(PROTOCOL_N2C_HANDSHAKE); + let statequery = plexer.use_client_channel(PROTOCOL_N2C_STATE_QUERY); + let chainsync = plexer.use_client_channel(PROTOCOL_N2C_CHAIN_SYNC); - // setup the multiplexer by specifying the bearer and the IDs of the - // miniprotocols to use - let mut plexer = multiplexer::StdPlexer::new(bearer); - let handshake = plexer.use_client_channel(PROTOCOL_N2C_HANDSHAKE); - let statequery = plexer.use_client_channel(PROTOCOL_N2C_STATE_QUERY); - let chainsync = plexer.use_client_channel(PROTOCOL_N2C_CHAIN_SYNC); + plexer.muxer.spawn(); + plexer.demuxer.spawn(); - plexer.muxer.spawn(); - plexer.demuxer.spawn(); + // execute the required handshake against the relay + do_handshake(handshake); - // execute the required handshake against the relay - do_handshake(handshake); + // execute an arbitrary "Local State" query against the node + do_localstate_query(statequery); - // execute an arbitrary "Local State" query against the node - do_localstate_query(statequery); - - // execute the chainsync flow from an arbitrary point in the chain - do_chainsync(chainsync); + // execute the chainsync flow from an arbitrary point in the chain + do_chainsync(chainsync); + } } diff --git a/pallas-miniprotocols/src/txsubmission/client.rs b/pallas-miniprotocols/src/txsubmission/client.rs index 9f3c690..79d0a2e 100644 --- a/pallas-miniprotocols/src/txsubmission/client.rs +++ b/pallas-miniprotocols/src/txsubmission/client.rs @@ -3,7 +3,10 @@ use std::marker::PhantomData; use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer}; -use super::protocol::{Error, Message, State, TxBody, TxIdAndSize}; +use super::{ + protocol::{Error, Message, State, TxIdAndSize}, + EraTxBody, EraTxId, +}; pub enum Request { TxIds(u16, u16), @@ -11,18 +14,32 @@ pub enum Request { Txs(Vec), } -pub struct Client(State, ChannelBuffer, PhantomData) +/// A generic Ouroboros client for submitting a generic notion of "transactions" to another server +pub struct GenericClient( + State, + ChannelBuffer, + PhantomData, + PhantomData, +) where H: Channel, - Message: Fragment; + Message: Fragment; -impl Client +/// A cardano specific instantiation of the ouroboros protocol +pub type Client = GenericClient; + +impl GenericClient where H: Channel, - Message: Fragment, + Message: Fragment, { pub fn new(channel: H) -> Self { - Self(State::Init, ChannelBuffer::new(channel), PhantomData {}) + Self( + State::Init, + ChannelBuffer::new(channel), + PhantomData {}, + PhantomData {}, + ) } pub fn state(&self) -> &State { @@ -54,7 +71,7 @@ where } /// As a client in a specific state, am I allowed to send this message? - fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Init, Message::Init) => Ok(()), (State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()), @@ -66,7 +83,7 @@ where } /// As a client in a specific state, am I allowed to receive this message? - fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Idle, Message::RequestTxIds(..)) => Ok(()), (State::Idle, Message::RequestTxs(..)) => Ok(()), @@ -74,7 +91,7 @@ where } } - pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?; @@ -82,7 +99,7 @@ where Ok(()) } - pub fn recv_message(&mut self) -> Result, Error> { + pub fn recv_message(&mut self) -> Result, Error> { self.assert_agency_is_theirs()?; let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?; self.assert_inbound_state(&msg)?; diff --git a/pallas-miniprotocols/src/txsubmission/codec.rs b/pallas-miniprotocols/src/txsubmission/codec.rs index 5ea9352..bb55717 100644 --- a/pallas-miniprotocols/src/txsubmission/codec.rs +++ b/pallas-miniprotocols/src/txsubmission/codec.rs @@ -1,8 +1,8 @@ -use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; +use pallas_codec::minicbor::{data::Tag, decode, encode, Decode, Decoder, Encode, Encoder}; use super::{ protocol::{Message, TxIdAndSize}, - EraTxId, TxBody, + EraTxBody, EraTxId, }; impl> Encode<()> for TxIdAndSize { @@ -31,7 +31,7 @@ impl<'b, TxId: Decode<'b, ()>> Decode<'b, ()> for TxIdAndSize { } } -impl> Encode<()> for Message { +impl, TxBody: Encode<()>> Encode<()> for Message { fn encode( &self, e: &mut Encoder, @@ -71,7 +71,7 @@ impl> Encode<()> for Message { e.array(2)?.u16(3)?; e.begin_array()?; for tx in txs { - e.bytes(&tx.0)?; + e.encode(tx)?; } e.end()?; Ok(()) @@ -84,18 +84,33 @@ impl> Encode<()> for Message { } } -impl<'b> Decode<'b, ()> for TxBody { +impl<'b> Decode<'b, ()> for EraTxBody { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { d.array()?; - // TODO: the TxBody encoding here needs to be pinned down and parameterized, the - // same way we did TxId! - d.u16()?; // Era? - d.tag()?; // tag 24? - Ok(TxBody(d.bytes()?.to_vec())) + let era = d.u16()?; + let tag = d.tag()?; + if tag != Tag::Cbor { + return Err(decode::Error::message("Expected encoded CBOR data item")); + } + Ok(EraTxBody(era, d.bytes()?.to_vec())) } } -impl<'b, TxId: Decode<'b, ()>> Decode<'b, ()> for Message { +impl Encode<()> for EraTxBody { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + e.array(2)?; + e.u16(self.0)?; + e.tag(Tag::Cbor)?; + e.bytes(&self.1)?; + Ok(()) + } +} + +impl<'b, TxId: Decode<'b, ()>, TxBody: Decode<'b, ()>> Decode<'b, ()> for Message { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { d.array()?; let label = d.u16()?; diff --git a/pallas-miniprotocols/src/txsubmission/protocol.rs b/pallas-miniprotocols/src/txsubmission/protocol.rs index 0f126dc..db38009 100644 --- a/pallas-miniprotocols/src/txsubmission/protocol.rs +++ b/pallas-miniprotocols/src/txsubmission/protocol.rs @@ -21,21 +21,13 @@ pub type TxSizeInBytes = u32; #[derive(Debug, Clone)] pub struct EraTxId(pub u16, pub Vec); +// The bytes of a transaction, with an era number and some raw CBOR +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct EraTxBody(pub u16, pub Vec); + #[derive(Debug)] pub struct TxIdAndSize(pub TxID, pub TxSizeInBytes); -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct TxBody(pub Vec); - -#[derive(Debug, Clone)] -pub struct Tx(pub TxId, pub TxBody); - -impl From> for TxIdAndSize { - fn from(other: Tx) -> Self { - TxIdAndSize(other.0, other.1 .0.len() as u32) - } -} - #[derive(Error, Debug)] pub enum Error { #[error("attempted to receive message while agency is ours")] @@ -58,7 +50,7 @@ pub enum Error { } #[derive(Debug)] -pub enum Message { +pub enum Message { Init, RequestTxIds(Blocking, TxCount, TxCount), ReplyTxIds(Vec>), diff --git a/pallas-miniprotocols/src/txsubmission/server.rs b/pallas-miniprotocols/src/txsubmission/server.rs index ce18b5f..431f73c 100644 --- a/pallas-miniprotocols/src/txsubmission/server.rs +++ b/pallas-miniprotocols/src/txsubmission/server.rs @@ -3,26 +3,43 @@ use std::marker::PhantomData; use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer}; -use super::protocol::{Blocking, Error, Message, State, TxBody, TxCount, TxIdAndSize}; +use super::{ + protocol::{Blocking, Error, Message, State, TxCount, TxIdAndSize}, + EraTxBody, EraTxId, +}; -pub enum Reply { +pub enum Reply { TxIds(Vec>), Txs(Vec), Done, } -pub struct Server(State, ChannelBuffer, PhantomData) +/// A generic implementation of an ouroboros server protocol ready to request and receive transactions from a client +pub struct GenericServer( + State, + ChannelBuffer, + PhantomData, + PhantomData, +) where H: Channel, - Message: Fragment; + Message: Fragment; -impl Server +/// A Cardano specific server for the ouroboros TxSubmission protocol +pub type Server = GenericServer; + +impl GenericServer where H: Channel, - Message: Fragment, + Message: Fragment, { pub fn new(channel: H) -> Self { - Self(State::Init, ChannelBuffer::new(channel), PhantomData {}) + Self( + State::Init, + ChannelBuffer::new(channel), + PhantomData {}, + PhantomData {}, + ) } pub fn state(&self) -> &State { @@ -54,7 +71,7 @@ where } /// As a server in a specific state, am I allowed to send this message? - fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Idle, Message::RequestTxIds(..)) => Ok(()), (State::Idle, Message::RequestTxs(..)) => Ok(()), @@ -63,7 +80,7 @@ where } /// As a server in a specific state, am I allowed to receive this message? - fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Init, Message::Init) => Ok(()), (State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()), @@ -74,7 +91,7 @@ where } } - pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?; @@ -82,7 +99,7 @@ where Ok(()) } - pub fn recv_message(&mut self) -> Result, Error> { + pub fn recv_message(&mut self) -> Result, Error> { self.assert_agency_is_theirs()?; let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?; self.assert_inbound_state(&msg)?; @@ -126,7 +143,7 @@ where Ok(()) } - pub fn receive_next_reply(&mut self) -> Result, Error> { + pub fn receive_next_reply(&mut self) -> Result, Error> { match self.recv_message()? { Message::ReplyTxIds(ids_and_sizes) => { self.0 = State::Idle; diff --git a/pallas-miniprotocols/tests/integration.rs b/pallas-miniprotocols/tests/integration.rs index 28e3b56..2724b11 100644 --- a/pallas-miniprotocols/tests/integration.rs +++ b/pallas-miniprotocols/tests/integration.rs @@ -2,7 +2,7 @@ use pallas_miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, handshake::{self, Confirmation}, - txsubmission::{self, EraTxId, Reply, TxIdAndSize}, + txsubmission::{self, EraTxBody, EraTxId, Reply, Server, TxIdAndSize}, Point, PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_TX_SUBMISSION, }; @@ -187,7 +187,7 @@ pub fn txsubmission_server_happy_path() { Ok(_) )); - let reply = server.receive_next_reply(); + let reply: Result<_, _> = server.receive_next_reply(); assert!(matches!(reply, Ok(Reply::TxIds(_)))); let Ok(Reply::TxIds(tx_ids)) = reply else { unreachable!() };