From dea731d58b09aaa77b52001c77bcf66f57e04c08 Mon Sep 17 00:00:00 2001 From: Ilya Date: Thu, 21 Sep 2023 02:41:37 +0300 Subject: [PATCH] feat(network): implement LocalTxSubmission client (#289) --- pallas-network/src/miniprotocols/README.md | 4 +- .../miniprotocols/localtxsubmission/client.rs | 208 ++++++++++++++++++ .../miniprotocols/localtxsubmission/codec.rs | 152 +++++++++++++ .../miniprotocols/localtxsubmission/mod.rs | 7 + .../localtxsubmission/protocol.rs | 22 ++ pallas-network/src/miniprotocols/mod.rs | 1 + 6 files changed, 392 insertions(+), 2 deletions(-) create mode 100644 pallas-network/src/miniprotocols/localtxsubmission/client.rs create mode 100644 pallas-network/src/miniprotocols/localtxsubmission/codec.rs create mode 100644 pallas-network/src/miniprotocols/localtxsubmission/mod.rs create mode 100644 pallas-network/src/miniprotocols/localtxsubmission/protocol.rs diff --git a/pallas-network/src/miniprotocols/README.md b/pallas-network/src/miniprotocols/README.md index 69c23a0..ff92593 100644 --- a/pallas-network/src/miniprotocols/README.md +++ b/pallas-network/src/miniprotocols/README.md @@ -13,14 +13,14 @@ The following architectural decisions were made for this particular Rust impleme ## Development Status | mini-protocol | initiator | responder | -| ------------------------------------------- | --------- | --------- | +| ------------------------------------------- |-----------| --------- | | block-fetch | done | planned | | chain-sync | done | planned | | [handshake](src/handshake/README.md) | done | done | | local-state | done | planned | | [tx-submission](src/txsubmission/README.md) | done | done | | local tx monitor | done | planned | -| local-tx-submission | ongoing | planned | +| local-tx-submission | done | planned | ## Implementation Details diff --git a/pallas-network/src/miniprotocols/localtxsubmission/client.rs b/pallas-network/src/miniprotocols/localtxsubmission/client.rs new file mode 100644 index 0000000..c6e5e15 --- /dev/null +++ b/pallas-network/src/miniprotocols/localtxsubmission/client.rs @@ -0,0 +1,208 @@ +use std::marker::PhantomData; + +use thiserror::Error; +use tracing::debug; + +use pallas_codec::Fragment; + +use crate::miniprotocols::localtxsubmission::{EraTx, Message, RejectReason, State}; +use crate::multiplexer; + +/// Cardano specific instantiation of LocalTxSubmission client. +pub type Client = GenericClient; + +/// A generic Ouroboros client for submitting a generic transaction +/// to a server, which possibly results in a generic rejection. +pub struct GenericClient { + state: State, + muxer: multiplexer::ChannelBuffer, + pd_tx: PhantomData, + pd_reject: PhantomData, +} + +impl GenericClient +where + Message: Fragment, +{ + /// Constructs a new LocalTxSubmission `Client` instance. + /// + /// # Arguments + /// * `channel` - An instance of `multiplexer::AgentChannel` to be used for + /// communication. + pub fn new(channel: multiplexer::AgentChannel) -> Self { + Self { + state: State::Idle, + muxer: multiplexer::ChannelBuffer::new(channel), + pd_tx: Default::default(), + pd_reject: Default::default(), + } + } + + /// Submits the given `tx` to the server. + /// + /// # Arguments + /// * `tx` - transaction to submit. + /// + /// # Errors + /// Returns an error if the agency is not ours or if the outbound state is + /// invalid. + pub async fn submit_tx(&mut self, tx: Tx) -> Result<(), Error> { + self.send_submit_tx(tx).await?; + self.recv_submit_tx_response().await + } + + /// Terminates the protocol gracefully. + /// + /// # Errors + /// Returns an error if the agency is not ours or if the outbound state is + /// invalid. + pub async fn terminate_gracefully(&mut self) -> Result<(), Error> { + let msg = Message::Done; + self.send_message(&msg).await?; + self.state = State::Done; + + Ok(()) + } + + /// Returns the current state of the client. + fn state(&self) -> &State { + &self.state + } + + /// Checks if the client has agency. + fn has_agency(&self) -> bool { + match self.state() { + State::Idle => true, + State::Busy | State::Done => false, + } + } + + fn assert_agency_is_ours(&self) -> Result<(), Error> { + if !self.has_agency() { + Err(Error::AgencyIsTheirs) + } else { + Ok(()) + } + } + + fn assert_agency_is_theirs(&self) -> Result<(), Error> { + if self.has_agency() { + Err(Error::AgencyIsOurs) + } else { + Ok(()) + } + } + + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.state, msg) { + (State::Idle, Message::SubmitTx(_) | Message::Done) => Ok(()), + _ => Err(Error::InvalidOutbound), + } + } + + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.state, msg) { + (State::Busy, Message::AcceptTx | Message::RejectTx(_)) => Ok(()), + _ => Err(Error::InvalidInbound), + } + } + + /// Sends a message to the server + /// + /// # Arguments + /// + /// * `msg` - A reference to the `Message` to be sent. + /// + /// # Errors + /// Returns an error if the agency is not ours or if the outbound state is + /// invalid. + async fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + self.assert_agency_is_ours()?; + self.assert_outbound_state(msg)?; + + self.muxer + .send_msg_chunks(msg) + .await + .map_err(Error::ChannelError)?; + + Ok(()) + } + + /// Receives the next message from the server. + /// + /// # Errors + /// Returns an error if the agency is not theirs or if the inbound state is + /// invalid. + async fn recv_message(&mut self) -> Result, Error> { + self.assert_agency_is_theirs()?; + + let msg = self + .muxer + .recv_full_msg() + .await + .map_err(Error::ChannelError)?; + + self.assert_inbound_state(&msg)?; + + Ok(msg) + } + + /// Sends SubmitTx message to the server. + /// + /// # Arguments + /// * `tx` - transaction to submit. + /// + /// # Errors + /// Returns an error if the agency is not ours or if the outbound state is + /// invalid. + async fn send_submit_tx(&mut self, tx: Tx) -> Result<(), Error> { + let msg = Message::SubmitTx(tx); + self.send_message(&msg).await?; + self.state = State::Busy; + + debug!("sent SubmitTx"); + + Ok(()) + } + + /// Receives SubmitTx response from the server. + /// + /// # Errors + /// Returns an error if the inbound message is invalid. + async fn recv_submit_tx_response(&mut self) -> Result<(), Error> { + debug!("waiting for SubmitTx response"); + + match self.recv_message().await? { + Message::AcceptTx => { + self.state = State::Idle; + Ok(()) + } + Message::RejectTx(rejection) => { + self.state = State::Idle; + Err(Error::TxRejected(rejection)) + } + _ => Err(Error::InvalidInbound), + } + } +} + +#[derive(Error, Debug)] +pub enum Error { + #[error("attempted to receive message while agency is ours")] + AgencyIsOurs, + + #[error("attempted to send message while agency is theirs")] + AgencyIsTheirs, + + #[error("inbound message is not valid for current state")] + InvalidInbound, + + #[error("outbound message is not valid for current state")] + InvalidOutbound, + + #[error("error while sending or receiving data through the channel")] + ChannelError(multiplexer::Error), + + #[error("tx was rejected by the server")] + TxRejected(Reject), +} diff --git a/pallas-network/src/miniprotocols/localtxsubmission/codec.rs b/pallas-network/src/miniprotocols/localtxsubmission/codec.rs new file mode 100644 index 0000000..c05d264 --- /dev/null +++ b/pallas-network/src/miniprotocols/localtxsubmission/codec.rs @@ -0,0 +1,152 @@ +use pallas_codec::minicbor::{decode, Decode, Decoder, encode, Encode, Encoder}; +use pallas_codec::minicbor::data::Tag; + +use crate::miniprotocols::localtxsubmission::{EraTx, Message, RejectReason}; + +impl Encode<()> for Message + where + Tx: Encode<()>, + Reject: Encode<()>, +{ + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + Message::SubmitTx(tx) => { + e.array(2)?.u16(0)?; + e.encode(tx)?; + Ok(()) + } + Message::AcceptTx => { + e.array(1)?.u16(1)?; + Ok(()) + } + Message::RejectTx(rejection) => { + e.array(2)?.u16(2)?; + e.encode(rejection)?; + Ok(()) + } + Message::Done => { + e.array(1)?.u16(3)?; + Ok(()) + } + } + } +} + +impl<'b, Tx: Decode<'b, ()>, Reject: Decode<'b, ()>> Decode<'b, ()> for Message { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + d.array()?; + let label = d.u16()?; + match label { + 0 => { + let tx = d.decode()?; + Ok(Message::SubmitTx(tx)) + } + 1 => Ok(Message::AcceptTx), + 2 => { + let rejection = d.decode()?; + Ok(Message::RejectTx(rejection)) + } + 3 => Ok(Message::Done), + _ => Err(decode::Error::message("can't decode Message")), + } + } +} + +impl<'b> Decode<'b, ()> for EraTx { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + d.array()?; + let era = d.u16()?; + let tag = d.tag()?; + if tag != Tag::Cbor { + return Err(decode::Error::message("Expected encoded CBOR data item")); + } + Ok(EraTx(era, d.bytes()?.to_vec())) + } +} + +impl Encode<()> for EraTx { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + e.array(2)?; + e.u16(self.0)?; + e.tag(Tag::Cbor)?; + e.bytes(&self.1)?; + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for RejectReason { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + let remainder = d.input().to_vec(); + Ok(RejectReason(remainder)) + } +} + +impl Encode<()> for RejectReason { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + e.writer_mut() + .write_all(&self.0) + .map_err(|w_err| encode::Error::write(w_err))?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use pallas_codec::{Fragment, minicbor}; + + use crate::miniprotocols::localtxsubmission::{EraTx, Message, RejectReason}; + use crate::multiplexer::Error; + + #[test] + fn decode_reject_message() { + let mut bytes = hex::decode(RAW_REJECT_RESPONSE).unwrap(); + let msg_res = try_decode_message::>(&mut bytes); + assert!(msg_res.is_ok()) + } + + fn try_decode_message(buffer: &mut Vec) -> Result, Error> + where + M: Fragment, + { + let mut decoder = minicbor::Decoder::new(buffer); + let maybe_msg = decoder.decode(); + + match maybe_msg { + Ok(msg) => { + let pos = decoder.position(); + buffer.drain(0..pos); + Ok(Some(msg)) + } + Err(err) if err.is_end_of_input() => Ok(None), + Err(err) => Err(Error::Decoding(err.to_string())), + } + } + + const RAW_REJECT_RESPONSE: &str = + "82028182059f820082018200820a81581c3b890fb5449baedf5342a48ee9c9ec6acbc995641be92ad21f08c686\ + 8200820183038158202628ce6ff8cc7ff0922072d930e4a693c17f991748dedece0be64819a2f9ef7782582031d\ + 54ce8d7e8cb262fc891282f44e9d24c3902dc38fac63fd469e8bf3006376b5820750852fdaf0f2dd724291ce007\ + b8e76d74bcf28076ed0c494cd90c0cfe1c9ca582008201820782000000018200820183048158201a547638b4cf4\ + a3cec386e2f898ac6bc987fadd04277e1d3c8dab5c505a5674e8158201457e4107607f83a80c3c4ffeb70910c2b\ + a3a35cf1699a2a7375f50fcc54a931820082028201830500821a00636185a2581c6f1a1f0c7ccf632cc9ff4b796\ + 87ed13ffe5b624cce288b364ebdce50a144414749581b000000032a9f8800581c795ecedb09821cb922c13060c8\ + f6377c3344fa7692551e865d86ac5da158205399c766fb7c494cddb2f7ae53cc01285474388757bc05bd575c14a\ + 713a432a901820082028201820085825820497fe6401e25733c073c01164c7f2a1a05de8c95e36580f9d1b05123\ + 70040def028258207911ba2b7d91ac56b05ea351282589fe30f4717a707a1b9defaf282afe5ba44200825820791\ + 1ba2b7d91ac56b05ea351282589fe30f4717a707a1b9defaf282afe5ba44201825820869bcb6f35e6b7912c25e5\ + cb33fb9906b097980a83f2b8ef40b51c4ef52eccd402825820efc267ad2c15c34a117535eecc877241ed836eb3e\ + 643ec90de21ca1b12fd79c20282008202820181148200820283023a000f0f6d1a004944ce820082028201830d3a\ + 000f0f6d1a00106253820082028201830182811a02409e10811a024138c01a0255e528ff"; +} diff --git a/pallas-network/src/miniprotocols/localtxsubmission/mod.rs b/pallas-network/src/miniprotocols/localtxsubmission/mod.rs new file mode 100644 index 0000000..a9d1658 --- /dev/null +++ b/pallas-network/src/miniprotocols/localtxsubmission/mod.rs @@ -0,0 +1,7 @@ +pub use client::*; +pub use codec::*; +pub use protocol::*; + +mod client; +mod codec; +mod protocol; diff --git a/pallas-network/src/miniprotocols/localtxsubmission/protocol.rs b/pallas-network/src/miniprotocols/localtxsubmission/protocol.rs new file mode 100644 index 0000000..3a2cc4d --- /dev/null +++ b/pallas-network/src/miniprotocols/localtxsubmission/protocol.rs @@ -0,0 +1,22 @@ +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum State { + Idle, + Busy, + Done, +} + +#[derive(Debug)] +pub enum Message { + SubmitTx(Tx), + AcceptTx, + RejectTx(Reject), + Done, +} + +// The bytes of a transaction with an era number. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct EraTx(pub u16, pub Vec); + +// Raw reject reason. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct RejectReason(pub Vec); diff --git a/pallas-network/src/miniprotocols/mod.rs b/pallas-network/src/miniprotocols/mod.rs index e261485..6c452db 100644 --- a/pallas-network/src/miniprotocols/mod.rs +++ b/pallas-network/src/miniprotocols/mod.rs @@ -6,6 +6,7 @@ pub mod blockfetch; pub mod chainsync; pub mod handshake; pub mod localstate; +pub mod localtxsubmission; pub mod txmonitor; pub mod txsubmission;