diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 36182de..f40a140 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -1,14 +1,12 @@ use pallas::network::{ - miniprotocols::{ - chainsync, handshake, localstate, Point, MAINNET_MAGIC, PROTOCOL_N2C_CHAIN_SYNC, - PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, - }, - multiplexer::{self, bearers::Bearer}, + miniprotocols::{chainsync, handshake, localstate, Point, MAINNET_MAGIC}, + multiplexer, }; #[derive(Debug)] struct LoggingObserver; +#[allow(dead_code)] fn do_handshake(channel: multiplexer::StdChannel) { let mut client = handshake::N2CClient::new(channel); @@ -26,6 +24,7 @@ fn do_handshake(channel: multiplexer::StdChannel) { } } +#[allow(dead_code)] fn do_localstate_query(channel: multiplexer::StdChannel) { let mut client = localstate::ClientV10::new(channel); client.acquire(None).unwrap(); @@ -37,6 +36,7 @@ fn do_localstate_query(channel: multiplexer::StdChannel) { log::info!("system start result: {:?}", result); } +#[allow(dead_code)] fn do_chainsync(channel: multiplexer::StdChannel) { let known_points = vec![Point::Specific( 43847831u64, @@ -67,30 +67,39 @@ fn main() { .filter_level(log::LevelFilter::Trace) .init(); + #[cfg(not(target_family = "unix"))] + { + panic!("can't use n2c unix socket on non-unix systems"); + } // we connect to the unix socket of the local node. Make sure you have the right // path for your environment #[cfg(target_family = "unix")] - let bearer = Bearer::connect_unix("/tmp/node.socket").unwrap(); + { + use pallas::network::{ + miniprotocols::{ + PROTOCOL_N2C_CHAIN_SYNC, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, + }, + multiplexer::bearers::Bearer, + }; + let bearer = Bearer::connect_unix("/tmp/node.socket").unwrap(); - #[cfg(not(target_family = "unix"))] - panic!("can't use n2c unix socket on non-unix systems"); + // setup the multiplexer by specifying the bearer and the IDs of the + // miniprotocols to use + let mut plexer = multiplexer::StdPlexer::new(bearer); + let handshake = plexer.use_client_channel(PROTOCOL_N2C_HANDSHAKE); + let statequery = plexer.use_client_channel(PROTOCOL_N2C_STATE_QUERY); + let chainsync = plexer.use_client_channel(PROTOCOL_N2C_CHAIN_SYNC); - // setup the multiplexer by specifying the bearer and the IDs of the - // miniprotocols to use - let mut plexer = multiplexer::StdPlexer::new(bearer); - let handshake = plexer.use_client_channel(PROTOCOL_N2C_HANDSHAKE); - let statequery = plexer.use_client_channel(PROTOCOL_N2C_STATE_QUERY); - let chainsync = plexer.use_client_channel(PROTOCOL_N2C_CHAIN_SYNC); + plexer.muxer.spawn(); + plexer.demuxer.spawn(); - plexer.muxer.spawn(); - plexer.demuxer.spawn(); + // execute the required handshake against the relay + do_handshake(handshake); - // execute the required handshake against the relay - do_handshake(handshake); + // execute an arbitrary "Local State" query against the node + do_localstate_query(statequery); - // execute an arbitrary "Local State" query against the node - do_localstate_query(statequery); - - // execute the chainsync flow from an arbitrary point in the chain - do_chainsync(chainsync); + // execute the chainsync flow from an arbitrary point in the chain + do_chainsync(chainsync); + } } diff --git a/pallas-miniprotocols/src/txsubmission/client.rs b/pallas-miniprotocols/src/txsubmission/client.rs index 9f3c690..79d0a2e 100644 --- a/pallas-miniprotocols/src/txsubmission/client.rs +++ b/pallas-miniprotocols/src/txsubmission/client.rs @@ -3,7 +3,10 @@ use std::marker::PhantomData; use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer}; -use super::protocol::{Error, Message, State, TxBody, TxIdAndSize}; +use super::{ + protocol::{Error, Message, State, TxIdAndSize}, + EraTxBody, EraTxId, +}; pub enum Request { TxIds(u16, u16), @@ -11,18 +14,32 @@ pub enum Request { Txs(Vec), } -pub struct Client(State, ChannelBuffer, PhantomData) +/// A generic Ouroboros client for submitting a generic notion of "transactions" to another server +pub struct GenericClient( + State, + ChannelBuffer, + PhantomData, + PhantomData, +) where H: Channel, - Message: Fragment; + Message: Fragment; -impl Client +/// A cardano specific instantiation of the ouroboros protocol +pub type Client = GenericClient; + +impl GenericClient where H: Channel, - Message: Fragment, + Message: Fragment, { pub fn new(channel: H) -> Self { - Self(State::Init, ChannelBuffer::new(channel), PhantomData {}) + Self( + State::Init, + ChannelBuffer::new(channel), + PhantomData {}, + PhantomData {}, + ) } pub fn state(&self) -> &State { @@ -54,7 +71,7 @@ where } /// As a client in a specific state, am I allowed to send this message? - fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Init, Message::Init) => Ok(()), (State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()), @@ -66,7 +83,7 @@ where } /// As a client in a specific state, am I allowed to receive this message? - fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Idle, Message::RequestTxIds(..)) => Ok(()), (State::Idle, Message::RequestTxs(..)) => Ok(()), @@ -74,7 +91,7 @@ where } } - pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?; @@ -82,7 +99,7 @@ where Ok(()) } - pub fn recv_message(&mut self) -> Result, Error> { + pub fn recv_message(&mut self) -> Result, Error> { self.assert_agency_is_theirs()?; let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?; self.assert_inbound_state(&msg)?; diff --git a/pallas-miniprotocols/src/txsubmission/codec.rs b/pallas-miniprotocols/src/txsubmission/codec.rs index 5ea9352..bb55717 100644 --- a/pallas-miniprotocols/src/txsubmission/codec.rs +++ b/pallas-miniprotocols/src/txsubmission/codec.rs @@ -1,8 +1,8 @@ -use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; +use pallas_codec::minicbor::{data::Tag, decode, encode, Decode, Decoder, Encode, Encoder}; use super::{ protocol::{Message, TxIdAndSize}, - EraTxId, TxBody, + EraTxBody, EraTxId, }; impl> Encode<()> for TxIdAndSize { @@ -31,7 +31,7 @@ impl<'b, TxId: Decode<'b, ()>> Decode<'b, ()> for TxIdAndSize { } } -impl> Encode<()> for Message { +impl, TxBody: Encode<()>> Encode<()> for Message { fn encode( &self, e: &mut Encoder, @@ -71,7 +71,7 @@ impl> Encode<()> for Message { e.array(2)?.u16(3)?; e.begin_array()?; for tx in txs { - e.bytes(&tx.0)?; + e.encode(tx)?; } e.end()?; Ok(()) @@ -84,18 +84,33 @@ impl> Encode<()> for Message { } } -impl<'b> Decode<'b, ()> for TxBody { +impl<'b> Decode<'b, ()> for EraTxBody { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { d.array()?; - // TODO: the TxBody encoding here needs to be pinned down and parameterized, the - // same way we did TxId! - d.u16()?; // Era? - d.tag()?; // tag 24? - Ok(TxBody(d.bytes()?.to_vec())) + let era = d.u16()?; + let tag = d.tag()?; + if tag != Tag::Cbor { + return Err(decode::Error::message("Expected encoded CBOR data item")); + } + Ok(EraTxBody(era, d.bytes()?.to_vec())) } } -impl<'b, TxId: Decode<'b, ()>> Decode<'b, ()> for Message { +impl Encode<()> for EraTxBody { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + e.array(2)?; + e.u16(self.0)?; + e.tag(Tag::Cbor)?; + e.bytes(&self.1)?; + Ok(()) + } +} + +impl<'b, TxId: Decode<'b, ()>, TxBody: Decode<'b, ()>> Decode<'b, ()> for Message { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { d.array()?; let label = d.u16()?; diff --git a/pallas-miniprotocols/src/txsubmission/protocol.rs b/pallas-miniprotocols/src/txsubmission/protocol.rs index 0f126dc..db38009 100644 --- a/pallas-miniprotocols/src/txsubmission/protocol.rs +++ b/pallas-miniprotocols/src/txsubmission/protocol.rs @@ -21,21 +21,13 @@ pub type TxSizeInBytes = u32; #[derive(Debug, Clone)] pub struct EraTxId(pub u16, pub Vec); +// The bytes of a transaction, with an era number and some raw CBOR +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct EraTxBody(pub u16, pub Vec); + #[derive(Debug)] pub struct TxIdAndSize(pub TxID, pub TxSizeInBytes); -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct TxBody(pub Vec); - -#[derive(Debug, Clone)] -pub struct Tx(pub TxId, pub TxBody); - -impl From> for TxIdAndSize { - fn from(other: Tx) -> Self { - TxIdAndSize(other.0, other.1 .0.len() as u32) - } -} - #[derive(Error, Debug)] pub enum Error { #[error("attempted to receive message while agency is ours")] @@ -58,7 +50,7 @@ pub enum Error { } #[derive(Debug)] -pub enum Message { +pub enum Message { Init, RequestTxIds(Blocking, TxCount, TxCount), ReplyTxIds(Vec>), diff --git a/pallas-miniprotocols/src/txsubmission/server.rs b/pallas-miniprotocols/src/txsubmission/server.rs index ce18b5f..431f73c 100644 --- a/pallas-miniprotocols/src/txsubmission/server.rs +++ b/pallas-miniprotocols/src/txsubmission/server.rs @@ -3,26 +3,43 @@ use std::marker::PhantomData; use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer}; -use super::protocol::{Blocking, Error, Message, State, TxBody, TxCount, TxIdAndSize}; +use super::{ + protocol::{Blocking, Error, Message, State, TxCount, TxIdAndSize}, + EraTxBody, EraTxId, +}; -pub enum Reply { +pub enum Reply { TxIds(Vec>), Txs(Vec), Done, } -pub struct Server(State, ChannelBuffer, PhantomData) +/// A generic implementation of an ouroboros server protocol ready to request and receive transactions from a client +pub struct GenericServer( + State, + ChannelBuffer, + PhantomData, + PhantomData, +) where H: Channel, - Message: Fragment; + Message: Fragment; -impl Server +/// A Cardano specific server for the ouroboros TxSubmission protocol +pub type Server = GenericServer; + +impl GenericServer where H: Channel, - Message: Fragment, + Message: Fragment, { pub fn new(channel: H) -> Self { - Self(State::Init, ChannelBuffer::new(channel), PhantomData {}) + Self( + State::Init, + ChannelBuffer::new(channel), + PhantomData {}, + PhantomData {}, + ) } pub fn state(&self) -> &State { @@ -54,7 +71,7 @@ where } /// As a server in a specific state, am I allowed to send this message? - fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Idle, Message::RequestTxIds(..)) => Ok(()), (State::Idle, Message::RequestTxs(..)) => Ok(()), @@ -63,7 +80,7 @@ where } /// As a server in a specific state, am I allowed to receive this message? - fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Init, Message::Init) => Ok(()), (State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()), @@ -74,7 +91,7 @@ where } } - pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?; @@ -82,7 +99,7 @@ where Ok(()) } - pub fn recv_message(&mut self) -> Result, Error> { + pub fn recv_message(&mut self) -> Result, Error> { self.assert_agency_is_theirs()?; let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?; self.assert_inbound_state(&msg)?; @@ -126,7 +143,7 @@ where Ok(()) } - pub fn receive_next_reply(&mut self) -> Result, Error> { + pub fn receive_next_reply(&mut self) -> Result, Error> { match self.recv_message()? { Message::ReplyTxIds(ids_and_sizes) => { self.0 = State::Idle; diff --git a/pallas-miniprotocols/tests/integration.rs b/pallas-miniprotocols/tests/integration.rs index 28e3b56..2724b11 100644 --- a/pallas-miniprotocols/tests/integration.rs +++ b/pallas-miniprotocols/tests/integration.rs @@ -2,7 +2,7 @@ use pallas_miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, handshake::{self, Confirmation}, - txsubmission::{self, EraTxId, Reply, TxIdAndSize}, + txsubmission::{self, EraTxBody, EraTxId, Reply, Server, TxIdAndSize}, Point, PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_TX_SUBMISSION, }; @@ -187,7 +187,7 @@ pub fn txsubmission_server_happy_path() { Ok(_) )); - let reply = server.receive_next_reply(); + let reply: Result<_, _> = server.receive_next_reply(); assert!(matches!(reply, Ok(Reply::TxIds(_)))); let Ok(Reply::TxIds(tx_ids)) = reply else { unreachable!() };