diff --git a/pallas-miniprotocols/README.md b/pallas-miniprotocols/README.md index bac00be..69c23a0 100644 --- a/pallas-miniprotocols/README.md +++ b/pallas-miniprotocols/README.md @@ -12,15 +12,15 @@ The following architectural decisions were made for this particular Rust impleme ## Development Status -| mini-protocol | initiator | responder | -| ------------------------------------------------- | --------- | --------- | -| block-fetch | done | planned | -| chain-sync | done | planned | -| handshake | done | planned | -| local-state | done | planned | -| [tx-submission](src/txsubmission/README.md) | done | done | -| local tx monitor | done | planned | -| local-tx-submission | ongoing | planned | +| mini-protocol | initiator | responder | +| ------------------------------------------- | --------- | --------- | +| block-fetch | done | planned | +| chain-sync | done | planned | +| [handshake](src/handshake/README.md) | done | done | +| local-state | done | planned | +| [tx-submission](src/txsubmission/README.md) | done | done | +| local tx monitor | done | planned | +| local-tx-submission | ongoing | planned | ## Implementation Details diff --git a/pallas-miniprotocols/src/handshake/README.md b/pallas-miniprotocols/src/handshake/README.md new file mode 100644 index 0000000..19d2eb9 --- /dev/null +++ b/pallas-miniprotocols/src/handshake/README.md @@ -0,0 +1,93 @@ +# Handshake + +The Handshake miniprotocol allows a client and server to negotiate a specific protocol version and set of parameters. Note: the specification refers to these as "protocol parameters", but this should not be confused with the usual notion of "protocol parameters" used by the Cardano ledger. + +The Handshake miniprotocol is the first protocol to run when a connection opens; in fact, in rare cases, two nodes may connect to eachother simultaneously. This is known as a "simultaneous TCP open", and the protocol is designed to be robust to this. + +This miniprotocol simulates a very simple state machine with three states, seen in the mermaid diagram below: + +```mermaid +graph LR + A[ ] -->|start| StPropose + StPropose[State::Propose] -->|Message::Propose| StConfirm + StConfirm[::Confirm] -->|::Accept| StDone[::Done] + StConfirm -->|::Propose| StDone + StConfirm -->|::Refuse| StDone +``` + +There are separate versions of this protocol depending on whether you're communicating between two nodes over TCP, or between a node and a local process, via a unix socket. + +This module provides two actors that implement either side of this role: Client and Server, each of which are detailed below. + +## Client + +You can instantiate a client like so +```rust + let mut n2n_client = handshake::N2NClient::new(channel0); + let mut n2c_client = handshake::N2CClient::new(channel0); +``` + +As the initiator, you then propose a set of versions you are aware of: +```rust + // Note: Other helper methods exist as well + n2n_client.send_propose(handshake::n2n::VersionTable::v7_and_above(MAINNET_MAGIC))?; +``` + +The server will then respond, either indicating which version they accept, or an outright refusal: + +```rust + match n2n_client.recv_while_confirm()? { + Confirmation::Accepted(version, parameters) => {}, + Request::Rejected(reason) => {} + } +``` + +For convenience, these two steps are wrapped in a `handshake` helper method: +```rust + n2n_client.handshake(handshake::n2n::VersionTable::v7_and_above(MAINNET_MAGIC))?; +``` + +Putting this all together, it looks something like this: + +```rust + let mut client = handshake::N2NClient::new(channel0); + client.handshake(handshake::n2n::VersionTable::v7_and_above(MAINNET_MAGIC))?; +``` + +## Server + +Conversely, you can instantiate a node to node or node to client server ready to shake hands like so + +```rust + let mut n2n_server = handshake::N2NServer::new(channel0); + let mut n2c_server = handshake::N2NServer::new(channel0); +``` + +You should first recieve the set of versions that the client is proposing: + +```rust + let versions = n2n_server.receive_proposed_versions()?; +``` + +Then, you can select one that you understand, and accept it, or refuse the handshake: + +```rust + // NOTE: in practice, your version selection is probably more complicated than this + if let Some(params) = versions.values.get(7) { + n2n_server.send_accept_version(7, params)?; + } else { + n2n_server.send_refuse(RefuseReason::VersionMismatch(vec![7]))?; + } +``` + +All-together, this might look something like: + +```rust + let mut server = handshake::N2NServer::new(channel0); + let versions = server.receive_proposed_versions()?; + if let Some(params) = versions.values.get(7) { + server.send_accept_version(7, params)?; + } else { + server.send_refuse(RefuseReason::VersionMismatch(vec![7]))?; + } +``` \ No newline at end of file diff --git a/pallas-miniprotocols/src/handshake/client.rs b/pallas-miniprotocols/src/handshake/client.rs index 8d9e8b6..e307706 100644 --- a/pallas-miniprotocols/src/handshake/client.rs +++ b/pallas-miniprotocols/src/handshake/client.rs @@ -1,27 +1,8 @@ use pallas_codec::Fragment; -use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError}; +use pallas_multiplexer::agents::{Channel, ChannelBuffer}; use std::marker::PhantomData; -use thiserror::*; -use super::{Message, RefuseReason, State, VersionNumber, VersionTable}; - -#[derive(Error, Debug)] -pub enum Error { - #[error("attempted to receive message while agency is ours")] - AgencyIsOurs, - - #[error("attempted to send message while agency is theirs")] - AgencyIsTheirs, - - #[error("inbound message is not valid for current state")] - InvalidInbound, - - #[error("outbound message is not valid for current state")] - InvalidOutbound, - - #[error("error while sending or receiving data through the channel")] - ChannelError(ChannelError), -} +use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable}; #[derive(Debug)] pub enum Confirmation { diff --git a/pallas-miniprotocols/src/handshake/mod.rs b/pallas-miniprotocols/src/handshake/mod.rs index f4d2634..a4319d8 100644 --- a/pallas-miniprotocols/src/handshake/mod.rs +++ b/pallas-miniprotocols/src/handshake/mod.rs @@ -1,8 +1,10 @@ mod client; mod protocol; +mod server; pub mod n2c; pub mod n2n; pub use client::*; pub use protocol::*; +pub use server::*; diff --git a/pallas-miniprotocols/src/handshake/protocol.rs b/pallas-miniprotocols/src/handshake/protocol.rs index 6d4d585..05a0912 100644 --- a/pallas-miniprotocols/src/handshake/protocol.rs +++ b/pallas-miniprotocols/src/handshake/protocol.rs @@ -1,6 +1,26 @@ use itertools::Itertools; use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; +use pallas_multiplexer::agents::ChannelError; use std::{collections::HashMap, fmt::Debug}; +use thiserror::*; + +#[derive(Error, Debug)] +pub enum Error { + #[error("attempted to receive message while agency is ours")] + AgencyIsOurs, + + #[error("attempted to send message while agency is theirs")] + AgencyIsTheirs, + + #[error("inbound message is not valid for current state")] + InvalidInbound, + + #[error("outbound message is not valid for current state")] + InvalidOutbound, + + #[error("error while sending or receiving data through the channel")] + ChannelError(ChannelError), +} #[derive(Debug, Clone)] pub struct VersionTable @@ -34,8 +54,9 @@ impl<'b, T> Decode<'b, ()> for VersionTable where T: Debug + Clone + Decode<'b, ()>, { - fn decode(_d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { - todo!() + fn decode(d: &mut Decoder<'b>, ctx: &mut ()) -> Result { + let values = d.map_iter_with(ctx)?.collect::>()?; + Ok(VersionTable { values }) } } @@ -93,7 +114,10 @@ where d.array()?; match d.u16()? { - 0 => todo!(), + 0 => { + let version_table = d.decode()?; + Ok(Message::Propose(version_table)) + } 1 => { let version_number = d.u64()?; let version_data = d.decode()?; diff --git a/pallas-miniprotocols/src/handshake/server.rs b/pallas-miniprotocols/src/handshake/server.rs new file mode 100644 index 0000000..fddd1cb --- /dev/null +++ b/pallas-miniprotocols/src/handshake/server.rs @@ -0,0 +1,114 @@ +use std::marker::PhantomData; + +use pallas_codec::Fragment; +use pallas_multiplexer::agents::{Channel, ChannelBuffer}; + +use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable}; + +pub struct Server(State, ChannelBuffer, PhantomData) +where + H: Channel; + +impl Server +where + H: Channel, + D: std::fmt::Debug + Clone, + Message: Fragment, +{ + pub fn new(channel: H) -> Self { + Self(State::Propose, ChannelBuffer::new(channel), PhantomData {}) + } + + pub fn state(&self) -> &State { + &self.0 + } + + pub fn is_done(&self) -> bool { + self.0 == State::Done + } + + pub fn has_agency(&self) -> bool { + matches!(self.state(), State::Confirm) + } + + fn assert_agency_is_ours(&self) -> Result<(), Error> { + if !self.has_agency() { + Err(Error::AgencyIsTheirs) + } else { + Ok(()) + } + } + + fn assert_agency_is_theirs(&self) -> Result<(), Error> { + if self.has_agency() { + Err(Error::AgencyIsOurs) + } else { + Ok(()) + } + } + + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.0, msg) { + (State::Confirm, Message::Accept(..)) => Ok(()), + (State::Confirm, Message::Refuse(_)) => Ok(()), + _ => Err(Error::InvalidOutbound), + } + } + + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.0, msg) { + (State::Propose, Message::Propose(..)) => Ok(()), + _ => Err(Error::InvalidInbound), + } + } + + pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + self.assert_agency_is_ours()?; + self.assert_outbound_state(msg)?; + self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?; + + Ok(()) + } + + pub fn recv_message(&mut self) -> Result, Error> { + self.assert_agency_is_theirs()?; + let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?; + self.assert_inbound_state(&msg)?; + + Ok(msg) + } + + pub fn receive_proposed_versions(&mut self) -> Result, Error> { + match self.recv_message()? { + Message::Propose(v) => { + self.0 = State::Confirm; + Ok(v) + } + _ => Err(Error::InvalidOutbound), + } + } + + pub fn accept_version(&mut self, version: VersionNumber, extra_params: D) -> Result<(), Error> { + let message = Message::Accept(version, extra_params); + self.send_message(&message)?; + self.0 = State::Done; + + Ok(()) + } + + pub fn refuse(&mut self, reason: RefuseReason) -> Result<(), Error> { + let message = Message::Refuse(reason); + self.send_message(&message)?; + self.0 = State::Done; + + Ok(()) + } + + pub fn unwrap(self) -> H { + self.1.unwrap() + } +} + +pub type N2NServer = Server; + +pub type N2CServer = Server; diff --git a/pallas-miniprotocols/src/txsubmission/client.rs b/pallas-miniprotocols/src/txsubmission/client.rs index d791220..9f3c690 100644 --- a/pallas-miniprotocols/src/txsubmission/client.rs +++ b/pallas-miniprotocols/src/txsubmission/client.rs @@ -1,26 +1,28 @@ +use std::marker::PhantomData; + use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer}; -use super::protocol::{Error, Message, State, TxBody, TxId, TxIdAndSize}; +use super::protocol::{Error, Message, State, TxBody, TxIdAndSize}; -pub enum Request { +pub enum Request { TxIds(u16, u16), TxIdsNonBlocking(u16, u16), Txs(Vec), } -pub struct Client(State, ChannelBuffer) +pub struct Client(State, ChannelBuffer, PhantomData) where H: Channel, - Message: Fragment; + Message: Fragment; -impl Client +impl Client where H: Channel, - Message: Fragment, + Message: Fragment, { pub fn new(channel: H) -> Self { - Self(State::Init, ChannelBuffer::new(channel)) + Self(State::Init, ChannelBuffer::new(channel), PhantomData {}) } pub fn state(&self) -> &State { @@ -31,8 +33,6 @@ where self.0 == State::Done } - // NOTE(pi): as of this writing, the network spec has a typo; this is the - // correct behavior fn has_agency(&self) -> bool { !matches!(self.state(), State::Idle) } @@ -54,7 +54,7 @@ where } /// As a client in a specific state, am I allowed to send this message? - fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Init, Message::Init) => Ok(()), (State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()), @@ -66,7 +66,7 @@ where } /// As a client in a specific state, am I allowed to receive this message? - fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Idle, Message::RequestTxIds(..)) => Ok(()), (State::Idle, Message::RequestTxs(..)) => Ok(()), @@ -74,7 +74,7 @@ where } } - pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?; @@ -82,7 +82,7 @@ where Ok(()) } - pub fn recv_message(&mut self) -> Result { + pub fn recv_message(&mut self) -> Result, Error> { self.assert_agency_is_theirs()?; let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?; self.assert_inbound_state(&msg)?; @@ -98,7 +98,7 @@ where Ok(()) } - pub fn reply_tx_ids(&mut self, ids: Vec) -> Result<(), Error> { + pub fn reply_tx_ids(&mut self, ids: Vec>) -> Result<(), Error> { let msg = Message::ReplyTxIds(ids); self.send_message(&msg)?; self.0 = State::Idle; @@ -114,7 +114,7 @@ where Ok(()) } - pub fn next_request(&mut self) -> Result { + pub fn next_request(&mut self) -> Result, Error> { match self.recv_message()? { Message::RequestTxIds(blocking, ack, req) => { self.0 = State::TxIdsBlocking; diff --git a/pallas-miniprotocols/src/txsubmission/codec.rs b/pallas-miniprotocols/src/txsubmission/codec.rs index a17ec09..5ea9352 100644 --- a/pallas-miniprotocols/src/txsubmission/codec.rs +++ b/pallas-miniprotocols/src/txsubmission/codec.rs @@ -1,32 +1,37 @@ use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; -use super::protocol::{Message, TxIdAndSize}; +use super::{ + protocol::{Message, TxIdAndSize}, + EraTxId, TxBody, +}; -impl Encode<()> for TxIdAndSize { +impl> Encode<()> for TxIdAndSize { fn encode( &self, e: &mut Encoder, _ctx: &mut (), ) -> Result<(), encode::Error> { e.array(2)?; - e.bytes(&self.0)?; + e.encode(&self.0)?; e.u32(self.1)?; Ok(()) } } -impl<'b> Decode<'b, ()> for TxIdAndSize { +impl<'b, TxId: Decode<'b, ()>> Decode<'b, ()> for TxIdAndSize { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { d.array()?; - let id = d.bytes()?; + + let tx_id = d.decode()?; + let size = d.u32()?; - Ok(Self(Vec::from(id), size)) + Ok(Self(tx_id, size)) } } -impl Encode<()> for Message { +impl> Encode<()> for Message { fn encode( &self, e: &mut Encoder, @@ -46,26 +51,29 @@ impl Encode<()> for Message { } Message::ReplyTxIds(ids) => { e.array(2)?.u16(1)?; - e.array(ids.len() as u64)?; + e.begin_array()?; for id in ids { e.encode(id)?; } + e.end()?; Ok(()) } Message::RequestTxs(ids) => { e.array(2)?.u16(2)?; - e.array(ids.len() as u64)?; + e.begin_array()?; for id in ids { - e.bytes(id)?; + e.encode(id)?; } + e.end()?; Ok(()) } Message::ReplyTxs(txs) => { e.array(2)?.u16(3)?; - e.array(txs.len() as u64)?; + e.begin_array()?; for tx in txs { - e.bytes(tx)?; + e.bytes(&tx.0)?; } + e.end()?; Ok(()) } Message::Done => { @@ -76,7 +84,18 @@ impl Encode<()> for Message { } } -impl<'b> Decode<'b, ()> for Message { +impl<'b> Decode<'b, ()> for TxBody { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + d.array()?; + // TODO: the TxBody encoding here needs to be pinned down and parameterized, the + // same way we did TxId! + d.u16()?; // Era? + d.tag()?; // tag 24? + Ok(TxBody(d.bytes()?.to_vec())) + } +} + +impl<'b, TxId: Decode<'b, ()>> Decode<'b, ()> for Message { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { d.array()?; let label = d.u16()?; @@ -96,9 +115,9 @@ impl<'b> Decode<'b, ()> for Message { let ids = d.decode()?; Ok(Message::RequestTxs(ids)) } - 3 => { - todo!() - } + 3 => Ok(Message::ReplyTxs( + d.array_iter()?.collect::>()?, + )), 4 => Ok(Message::Done), 6 => Ok(Message::Init), _ => Err(decode::Error::message( @@ -107,3 +126,29 @@ impl<'b> Decode<'b, ()> for Message { } } } + +impl Encode<()> for EraTxId { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + e.array(2)?; + e.encode(self.0)?; + e.bytes(&self.1)?; + + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for EraTxId { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + d.array()?; + + let era = d.u16()?; + + let tx_id = d.bytes()?; + + Ok(Self(era, tx_id.to_vec())) + } +} diff --git a/pallas-miniprotocols/src/txsubmission/protocol.rs b/pallas-miniprotocols/src/txsubmission/protocol.rs index 1967f93..0f126dc 100644 --- a/pallas-miniprotocols/src/txsubmission/protocol.rs +++ b/pallas-miniprotocols/src/txsubmission/protocol.rs @@ -17,25 +17,22 @@ pub type TxCount = u16; pub type TxSizeInBytes = u32; -pub type TxId = Vec; +// The bytes of a txId, tagged with an era number +#[derive(Debug, Clone)] +pub struct EraTxId(pub u16, pub Vec); #[derive(Debug)] -pub struct TxIdAndSize(pub TxId, pub TxSizeInBytes); +pub struct TxIdAndSize(pub TxID, pub TxSizeInBytes); -pub type TxBody = Vec; +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TxBody(pub Vec); #[derive(Debug, Clone)] -pub struct Tx(pub TxId, pub TxBody); +pub struct Tx(pub TxId, pub TxBody); -impl From for TxIdAndSize { - fn from(other: Tx) -> Self { - TxIdAndSize(other.0, other.1.len() as u32) - } -} - -impl From for TxId { - fn from(value: TxIdAndSize) -> Self { - value.0 +impl From> for TxIdAndSize { + fn from(other: Tx) -> Self { + TxIdAndSize(other.0, other.1 .0.len() as u32) } } @@ -61,10 +58,10 @@ pub enum Error { } #[derive(Debug)] -pub enum Message { +pub enum Message { Init, RequestTxIds(Blocking, TxCount, TxCount), - ReplyTxIds(Vec), + ReplyTxIds(Vec>), RequestTxs(Vec), ReplyTxs(Vec), Done, diff --git a/pallas-miniprotocols/src/txsubmission/server.rs b/pallas-miniprotocols/src/txsubmission/server.rs index a71bb1b..ce18b5f 100644 --- a/pallas-miniprotocols/src/txsubmission/server.rs +++ b/pallas-miniprotocols/src/txsubmission/server.rs @@ -1,26 +1,28 @@ +use std::marker::PhantomData; + use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer}; -use super::protocol::{Blocking, Error, Message, State, TxBody, TxCount, TxId, TxIdAndSize}; +use super::protocol::{Blocking, Error, Message, State, TxBody, TxCount, TxIdAndSize}; -pub enum Reply { - TxIds(Vec), +pub enum Reply { + TxIds(Vec>), Txs(Vec), Done, } -pub struct Server(State, ChannelBuffer) +pub struct Server(State, ChannelBuffer, PhantomData) where H: Channel, - Message: Fragment; + Message: Fragment; -impl Server +impl Server where H: Channel, - Message: Fragment, + Message: Fragment, { pub fn new(channel: H) -> Self { - Self(State::Init, ChannelBuffer::new(channel)) + Self(State::Init, ChannelBuffer::new(channel), PhantomData {}) } pub fn state(&self) -> &State { @@ -31,8 +33,6 @@ where self.0 == State::Done } - // NOTE(pi): as of this writing, the network spec has a typo; this is the - // correct behavior fn has_agency(&self) -> bool { matches!(self.state(), State::Idle) } @@ -54,7 +54,7 @@ where } /// As a server in a specific state, am I allowed to send this message? - fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Idle, Message::RequestTxIds(..)) => Ok(()), (State::Idle, Message::RequestTxs(..)) => Ok(()), @@ -63,7 +63,7 @@ where } /// As a server in a specific state, am I allowed to receive this message? - fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Init, Message::Init) => Ok(()), (State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()), @@ -74,7 +74,7 @@ where } } - pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?; @@ -82,7 +82,7 @@ where Ok(()) } - pub fn recv_message(&mut self) -> Result { + pub fn recv_message(&mut self) -> Result, Error> { self.assert_agency_is_theirs()?; let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?; self.assert_inbound_state(&msg)?; @@ -126,7 +126,7 @@ where Ok(()) } - pub fn receive_next_reply(&mut self) -> Result { + pub fn receive_next_reply(&mut self) -> Result, Error> { match self.recv_message()? { Message::ReplyTxIds(ids_and_sizes) => { self.0 = State::Idle; @@ -134,7 +134,7 @@ where Ok(Reply::TxIds(ids_and_sizes)) } Message::ReplyTxs(bodies) => { - self.0 = State::Txs; + self.0 = State::Idle; Ok(Reply::Txs(bodies)) } Message::Done => { diff --git a/pallas-miniprotocols/tests/integration.rs b/pallas-miniprotocols/tests/integration.rs index 0ffde5c..28e3b56 100644 --- a/pallas-miniprotocols/tests/integration.rs +++ b/pallas-miniprotocols/tests/integration.rs @@ -2,7 +2,7 @@ use pallas_miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, handshake::{self, Confirmation}, - txsubmission::{self, Reply}, + txsubmission::{self, EraTxId, Reply, TxIdAndSize}, Point, PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_TX_SUBMISSION, }; @@ -194,7 +194,12 @@ pub fn txsubmission_server_happy_path() { assert!(tx_ids.len() <= 3); assert!(matches!( - server.request_txs(tx_ids.into_iter().map(Into::into).collect()), + server.request_txs( + tx_ids + .into_iter() + .map(|txid: TxIdAndSize| txid.0) + .collect() + ), Ok(_) )); diff --git a/pallas-multiplexer/src/bearers.rs b/pallas-multiplexer/src/bearers.rs index b18a093..ef5ddc1 100644 --- a/pallas-multiplexer/src/bearers.rs +++ b/pallas-multiplexer/src/bearers.rs @@ -36,6 +36,7 @@ fn write_segment(writer: &mut impl Write, segment: Segment) -> Result<(), std::i msg.write_u32::(timestamp)?; msg.write_u16::(protocol)?; msg.write_u16::(payload.len() as u16)?; + msg.write_all(&payload)?; if event_enabled!(tracing::Level::TRACE) { trace!( @@ -46,8 +47,6 @@ fn write_segment(writer: &mut impl Write, segment: Segment) -> Result<(), std::i ); } - msg.write_all(&payload)?; - writer.write_all(&msg)?; writer.flush() }