feat(miniprotocols): Implement tx submission client (#220)

* feat(miniprotocols): Implement tx submission client

* Match CDDL specs

* Fix build errors

* Add server implementation for txsubmission

Also includes some documentation for how to use both the client and the server

* cargo fmt

* clippy suggestions

clippy pls

* Fail explicitly on missing n2c unix socket

---------

Co-authored-by: Pi Lanningham <pi@sundaeswap.finance>
This commit is contained in:
Santiago Carmuega 2023-02-04 02:38:12 +01:00 committed by GitHub
parent 5edb456c70
commit 16d0211c5b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 785 additions and 355 deletions

View file

@ -54,7 +54,7 @@ fn do_chainsync(channel: multiplexer::StdChannel) {
log::info!("rolling forward, block size: {}", h.len())
}
chainsync::NextResponse::RollBackward(x, _) => log::info!("rollback to {:?}", x),
chainsync::NextResponse::Await => log::info!("tip of chaing reached"),
chainsync::NextResponse::Await => log::info!("tip of chain reached"),
};
}
}
@ -66,8 +66,12 @@ fn main() {
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
#[cfg(target_family = "unix")]
let bearer = Bearer::connect_unix("/tmp/node.socket").unwrap();
#[cfg(not(target_family = "unix"))]
panic!("can't use n2c unix socket on non-unix systems");
// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut plexer = multiplexer::StdPlexer::new(bearer);

View file

@ -14,7 +14,7 @@ impl AssetFingerprint {
pub fn from_parts(policy_id: &str, asset_name: &str) -> Result<AssetFingerprint, Box<dyn Err>> {
let mut hasher = Blake2bVar::new(20).unwrap();
let c = format!("{}{}",policy_id,asset_name);
let c = format!("{policy_id}{asset_name}");
let raw = hex::decode(c)?;
hasher.update(raw.as_slice());
let mut buf = [0u8; 20];

View file

@ -13,7 +13,7 @@ impl<'b, C, const N: usize> minicbor::Decode<'b, C> for SkipCbor<N> {
) -> Result<Self, minicbor::decode::Error> {
{
let probe = d.probe();
println!("skipped cbor value {}: {:?}", N, probe.datatype()?);
println!("skipped cbor value {N}: {:?}", probe.datatype()?);
}
d.skip()?;

View file

@ -81,5 +81,5 @@ let agent = handshake::Client::initial(VersionTable::v4_and_above(MAINNET_MAGIC)
let agent = run_agent(agent, &mut channel).unwrap();
// print the final state of the agent
println!("{:?}", agent);
println!("{agent:?}");
```

View file

@ -0,0 +1,194 @@
# TxSubmission
The TxSubmission miniprotocol allows a client with transactions to announce to connect to a server interested in those transactions. This is used, for example, to propogate transactions in each nodes mempool from peer to peer, until they land on a node that is ready to mint a block. Without this protocol, transactions wouldn't get minted until the node they were submitted to produced a block, which may be few and far between.
Initially, the design of this protocol seems backwards: you might expect the server to have the transactions, and the client to download them. However, this design allows robust, two way congestion control: a node with transactions is not required to initiate a connection to anyone, and a node interested in transactions can pick and choose what they download. The name of the miniprotocol, "TxSubmission" rather than "TxDownload", further reinforces this.
Like other miniprotocols, the client and server maintain a state machine between them; each message sent or received transitions the protocol into a new state, where either the client or server is expected to do some work.
The client is expected to initiate a connection and initialize the protocol into the Idle state. While in the Idle state, the client is waiting for the server to request either transaction IDs or transactions; Once the server makes a request, the client responds with the appropriate data.
These state transitions is captured in the mermaid diagram below:
```mermaid
graph TB
A[ ] -->|start| StInit
StInit[State::Init] -->|Message::Init| StIdle
StIdle[::Idle] -->|::RequestTxIds| StTxIdsBlocking
StIdle -->|::RequestTxIds| StTxIdsNonBlocking
StIdle -->|::RequestTxs| StTxs
StTxIdsBlocking[::TxIdsBlocking] -->|::ReplyTxIds| StIdle
StTxIdsBlocking -->|::Done| StDone[::Done]
StTxIdsNonBlocking[::TxIdsNonBlocking] -->|::ReplyTxIds| StIdle
StTxs[::Txs] -->|::ReplyTxs| StIdle
```
This module provides two actors that implement either side of this role: Client and Server, each of which are detailed below.
## Client
You can instantiate a client like so
```rust
let mut client = txsubmission::Client::new(channel4);
```
As the initiator, you then need to send a message to initialize the protocol
```rust
client.send_init()?;
```
The server will then make a series of queries for available transaction Ids and transaction bodies, which you're responsible for handling:
```rust
loop {
match client.next_request()? {
Request::TxIds(acknowledged, next) => {},
Request::TxIdsNonBlocking(acknowledged, next) => {}
Request::Txs(ids) => {}
}
}
```
When the server requests some number of transaction Ids, as the client you should reply with up to that many ids, and the size of the transaction. In the non-blocking variant, you can return what you have immediately, while in the blocking variant you should wait until you can answer with the number requested by the server. We provide some sample code below:
```rust
Request::TxIdsNonBlocking(acknowledged, next) => {
// NOTE: incomplete implementation, see below
client.reply_tx_ids(
// NOTE: we assume some kind of mempool implementation, which maintains a per-connection FIFO
// this is not provided as part of pallas-miniprotocols
mempool.iter()
.take(next)
.map(|tx| TxIdAndSize(tx.id(), tx.len()))
.collect()
)?;
}
```
If the `acknowledged` field is nonzero, the server is letting you know that it either received or doesn't care about some of the previous transactions you sent, so you can advance the queue. Thus, the above code should be extended to
```rust
Request::TxIdsNonBlocking(acknowledged, next) => {
// discards the first N transactions, advancing the queue for this client
mempool.discard(acknowledged);
client.reply_tx_ids(
mempool.iter()
.take(next)
.map(|tx| TxIdAndSize(tx.id(), tx.len()))
.collect()
)?;
}
```
When the server requests to download some transaction bodies, you can respond with the details of those transactions:
```rust
Request::Txs(ids) => {
let txs: Vec<TxBody> = mempool.find(ids);
client.reply_txs(txs)?;
}
```
Finally, if you decide to terminate the miniprotocol while waiting for enough transactions to respond to a blocking `Request::TxIds` request, you can send a Done message to gracefully terminate the protocol.
```rust
client.send_done()?;
```
All together, this becomes:
```rust
let mut client = txsubmission::Client::new(channel4);
client.send_init()?;
loop {
match client.next_request()? {
Request::TxIds(acknowledged, next) => {
mempool.discard(acknowledged)?;
if !mempool.wait_for_at_least(next)? {
client.send_done()?;
break;
}
client.reply_tx_ids(
mempool.iter()
.take(next)
.map(|tx| TxIdAndSize(tx.id(), tx.len()))
.collect()
)?;
},
Request::TxIdsNonBlocking(acknowledged, next) => {
mempool.discard(acknowledged)?;
client.reply_tx_ids(
mempool.iter()
.take(next)
.map(|tx| TxIdAndSize(tx.id(), tx.len()))
)?;
}
Request::Txs(ids) => {
let txs = mempool.find(ids);
client.reply_txs(txs)?;
}
}
}
```
## Server
Conversely, you can instantiate a server ready to learn about new transactions like so
```rust
let mut server = txsubmission::Server::new(channel4);
```
Since this is a client initiated protocol, you first need to wait to receive an initialization message:
```rust
server.wait_for_init()?;
```
Then, you can begin querying for some number of transaction Ids
```rust
server.acknowledge_and_request_ids(true, 0, 16)?;
```
And wait for a response
```rust
match server.receive_next_reply()? {
Reply::TxIds(ids_and_sizes) => { }
Reply::Txs(txs) => { }
Reply::Done => { }
}
```
You can download those transactions with
```rust
server.request_txs(ids.iter().map(|tx_and_size| tx_and_size.0).collect());
```
After you receive some transaction Ids, if you request more you should acknowledge the ones you received
```rust
server.acknowledge_and_request_ids(true, 16, 16)?
```
All-together, this event loop could look something like this:
```rust
let mut server = txsubmission::Server::new(channel4);
server.wait_for_init()?;
let mut previous_count = 0;
server.acknowledge_and_request_ids(true, previous_count, 16);
loop {
match server.receive_next_reply()? {
Reply::TxIds(ids_and_sizes) => {
server.request_txs(ids_and_sizes.iter().map(|tx| tx.0).collect())?;
},
Reply::Txs(txs) => {
tx_channel.send(txs);
server.acknowledge_and_request_ids(true, txs.len(), 16)?;
},
Reply::Done => {
break;
}
}
}
```

View file

@ -0,0 +1,141 @@
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer};
use super::protocol::{Error, Message, State, TxBody, TxId, TxIdAndSize};
pub enum Request {
TxIds(u16, u16),
TxIdsNonBlocking(u16, u16),
Txs(Vec<TxId>),
}
pub struct Client<H>(State, ChannelBuffer<H>)
where
H: Channel,
Message: Fragment;
impl<H> Client<H>
where
H: Channel,
Message: Fragment,
{
pub fn new(channel: H) -> Self {
Self(State::Init, ChannelBuffer::new(channel))
}
pub fn state(&self) -> &State {
&self.0
}
pub fn is_done(&self) -> bool {
self.0 == State::Done
}
// NOTE(pi): as of this writing, the network spec has a typo; this is the correct behavior
fn has_agency(&self) -> bool {
!matches!(self.state(), State::Idle)
}
fn assert_agency_is_ours(&self) -> Result<(), Error> {
if !self.has_agency() {
Err(Error::AgencyIsTheirs)
} else {
Ok(())
}
}
fn assert_agency_is_theirs(&self) -> Result<(), Error> {
if self.has_agency() {
Err(Error::AgencyIsOurs)
} else {
Ok(())
}
}
/// As a client in a specific state, am I allowed to send this message?
fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Init, Message::Init) => Ok(()),
(State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()),
(State::TxIdsBlocking, Message::Done) => Ok(()),
(State::TxIdsNonBlocking, Message::ReplyTxIds(..)) => Ok(()),
(State::Txs, Message::ReplyTxs(..)) => Ok(()),
_ => Err(Error::InvalidOutbound),
}
}
/// As a client in a specific state, am I allowed to receive this message?
fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Idle, Message::RequestTxIds(..)) => Ok(()),
(State::Idle, Message::RequestTxs(..)) => Ok(()),
_ => Err(Error::InvalidInbound),
}
}
pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?;
Ok(())
}
pub fn recv_message(&mut self) -> Result<Message, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
}
pub fn send_init(&mut self) -> Result<(), Error> {
let msg = Message::Init;
self.send_message(&msg)?;
self.0 = State::Idle;
Ok(())
}
pub fn reply_tx_ids(&mut self, ids: Vec<TxIdAndSize>) -> Result<(), Error> {
let msg = Message::ReplyTxIds(ids);
self.send_message(&msg)?;
self.0 = State::Idle;
Ok(())
}
pub fn reply_txs(&mut self, txs: Vec<TxBody>) -> Result<(), Error> {
let msg = Message::ReplyTxs(txs);
self.send_message(&msg)?;
self.0 = State::Idle;
Ok(())
}
pub fn next_request(&mut self) -> Result<Request, Error> {
match self.recv_message()? {
Message::RequestTxIds(blocking, ack, req) => {
self.0 = State::TxIdsBlocking;
match blocking {
true => Ok(Request::TxIds(ack, req)),
false => Ok(Request::TxIdsNonBlocking(ack, req)),
}
}
Message::RequestTxs(x) => {
self.0 = State::Txs;
Ok(Request::Txs(x))
}
_ => Err(Error::InvalidInbound),
}
}
pub fn send_done(&mut self) -> Result<(), Error> {
let msg = Message::Done;
self.send_message(&msg)?;
self.0 = State::Done;
Ok(())
}
}

View file

@ -0,0 +1,109 @@
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use super::protocol::{Message, TxIdAndSize};
impl Encode<()> for TxIdAndSize {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
e.array(2)?;
e.bytes(&self.0)?;
e.u32(self.1)?;
Ok(())
}
}
impl<'b> Decode<'b, ()> for TxIdAndSize {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let id = d.bytes()?;
let size = d.u32()?;
Ok(Self(Vec::from(id), size))
}
}
impl Encode<()> for Message {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
Message::Init => {
e.array(1)?.u16(6)?;
Ok(())
}
Message::RequestTxIds(blocking, ack, req) => {
e.array(4)?.u16(0)?;
e.bool(*blocking)?;
e.u16(*ack)?;
e.u16(*req)?;
Ok(())
}
Message::ReplyTxIds(ids) => {
e.array(2)?.u16(1)?;
e.array(ids.len() as u64)?;
for id in ids {
e.encode(id)?;
}
Ok(())
}
Message::RequestTxs(ids) => {
e.array(2)?.u16(2)?;
e.array(ids.len() as u64)?;
for id in ids {
e.bytes(id)?;
}
Ok(())
}
Message::ReplyTxs(txs) => {
e.array(2)?.u16(3)?;
e.array(txs.len() as u64)?;
for tx in txs {
e.bytes(tx)?;
}
Ok(())
}
Message::Done => {
e.array(1)?.u16(4)?;
Ok(())
}
}
}
}
impl<'b> Decode<'b, ()> for Message {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let label = d.u16()?;
match label {
0 => {
let blocking = d.bool()?;
let ack = d.u16()?;
let req = d.u16()?;
Ok(Message::RequestTxIds(blocking, ack, req))
}
1 => {
let items = d.decode()?;
Ok(Message::ReplyTxIds(items))
}
2 => {
let ids = d.decode()?;
Ok(Message::RequestTxs(ids))
}
3 => {
todo!()
}
4 => Ok(Message::Done),
6 => Ok(Message::Init),
_ => Err(decode::Error::message(
"unknown variant for txsubmission message",
)),
}
}
}

View file

@ -1,306 +1,9 @@
use std::fmt::Debug;
mod client;
mod codec;
mod protocol;
mod server;
use itertools::Itertools;
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use tracing::debug;
use crate::machines::{Agent, MachineError, Transition};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
Idle,
TxIdsNonBlocking,
TxIdsBlocking,
Txs,
Done,
}
pub type Blocking = bool;
pub type TxCount = u16;
pub type TxSizeInBytes = u32;
pub type TxId = u64;
#[derive(Debug)]
pub struct TxIdAndSize(TxId, TxSizeInBytes);
impl Encode<()> for TxIdAndSize {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
e.array(2)?;
e.u64(self.0)?;
e.u32(self.1)?;
Ok(())
}
}
impl<'b> Decode<'b, ()> for TxIdAndSize {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let id = d.u64()?;
let size = d.u32()?;
Ok(Self(id, size))
}
}
pub type TxBody = Vec<u8>;
#[derive(Debug, Clone)]
pub struct Tx(TxId, TxBody);
impl From<&Tx> for TxIdAndSize {
fn from(other: &Tx) -> Self {
TxIdAndSize(other.0, other.1.len() as u32)
}
}
#[derive(Debug)]
pub enum Message {
RequestTxIds(Blocking, TxCount, TxCount),
ReplyTxIds(Vec<TxIdAndSize>),
RequestTxs(Vec<TxId>),
ReplyTxs(Vec<TxBody>),
Done,
}
impl Encode<()> for Message {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
Message::RequestTxIds(blocking, ack, req) => {
e.array(4)?.u16(0)?;
e.bool(*blocking)?;
e.u16(*ack)?;
e.u16(*req)?;
Ok(())
}
Message::ReplyTxIds(ids) => {
e.array(2)?.u16(1)?;
e.array(ids.len() as u64)?;
for id in ids {
e.encode(id)?;
}
Ok(())
}
Message::RequestTxs(ids) => {
e.array(2)?.u16(2)?;
e.array(ids.len() as u64)?;
for id in ids {
e.u64(*id)?;
}
Ok(())
}
Message::ReplyTxs(txs) => {
e.array(2)?.u16(3)?;
e.array(txs.len() as u64)?;
for tx in txs {
e.bytes(tx)?;
}
Ok(())
}
Message::Done => {
e.array(1)?.u16(4)?;
Ok(())
}
}
}
}
impl<'b> Decode<'b, ()> for Message {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let label = d.u16()?;
match label {
0 => {
let blocking = d.bool()?;
let ack = d.u16()?;
let req = d.u16()?;
Ok(Message::RequestTxIds(blocking, ack, req))
}
1 => {
let items = d.decode()?;
Ok(Message::ReplyTxIds(items))
}
2 => {
let ids = d.array_iter::<TxId>()?.try_collect()?;
Ok(Message::RequestTxs(ids))
}
3 => {
todo!()
}
4 => Ok(Message::Done),
_ => Err(decode::Error::message(
"unknown variant for txsubmission message",
)),
}
}
}
/// A very basic tx provider agent with a fixed set of tx to submit
///
/// This provider takes a set of tx from a vec as the single, static source of
/// data to transfer to the consumer. It's main use is for implementing peers
/// that need to answer to v1 implementations of the Tx-Submission
/// mini-protocol. Since v1 nodes dont' wait for a 'Hello' message, the peer
/// needs to be prepared to receive Tx requests. This naive provider serves as a
/// good placeholder for those scenarios.
#[derive(Debug)]
pub struct NaiveProvider {
pub state: State,
pub fifo_txs: Vec<Tx>,
pub requested_ids_count: usize,
pub requested_txs: Option<Vec<TxId>>,
}
impl NaiveProvider {
pub fn initial(fifo_txs: Vec<Tx>) -> Self {
Self {
state: State::Idle,
requested_ids_count: 0,
requested_txs: None,
fifo_txs,
}
}
fn reply_tx_ids_msg(&self) -> Message {
debug!(
"sending next {} tx ids from fifo queue",
self.requested_ids_count
);
let to_send = self.fifo_txs[0..self.requested_ids_count]
.iter()
.map_into()
.collect_vec();
Message::ReplyTxIds(to_send)
}
fn reply_txs_msg(&self) -> Message {
let matches = self
.fifo_txs
.iter()
.filter(|Tx(candidate_id, _)| match &self.requested_txs {
Some(requested) => requested.iter().contains(candidate_id),
None => false,
})
.map(|Tx(_, body)| body.clone())
.collect_vec();
Message::ReplyTxs(matches)
}
fn on_tx_ids_request(
self,
acknowledged_count: usize,
requested_ids_count: usize,
) -> Transition<Self> {
debug!(
"new tx id request {} (ack: {})",
requested_ids_count, acknowledged_count
);
debug!("draining {} from tx fifo queue", acknowledged_count);
let new_fifo: Vec<_> = self
.fifo_txs
.into_iter()
.skip(acknowledged_count - 1)
.collect();
Ok(Self {
state: State::Idle,
requested_ids_count,
fifo_txs: new_fifo,
..self
})
}
fn on_txs_request(self, requested_txs: Vec<TxId>) -> Transition<Self> {
debug!("new txs request {:?}", requested_txs,);
Ok(Self {
state: State::Idle,
requested_txs: Some(requested_txs),
..self
})
}
}
impl Agent for NaiveProvider {
type Message = Message;
type State = State;
fn state(&self) -> &Self::State {
&self.state
}
fn is_done(&self) -> bool {
self.state == State::Done
}
fn has_agency(&self) -> bool {
match self.state {
State::Idle => false,
State::TxIdsNonBlocking => true,
State::TxIdsBlocking => true,
State::Txs => true,
State::Done => false,
}
}
fn build_next(&self) -> Self::Message {
match &self.state {
State::TxIdsNonBlocking => self.reply_tx_ids_msg(),
State::TxIdsBlocking => Message::Done,
State::Txs => self.reply_txs_msg(),
_ => panic!(""),
}
}
fn apply_start(self) -> Transition<Self> {
Ok(self)
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
match (self.state, msg) {
(State::TxIdsNonBlocking, Message::ReplyTxIds(_)) => Ok(Self {
state: State::Idle,
..self
}),
(State::TxIdsBlocking, Message::Done) => Ok(Self {
state: State::Done,
..self
}),
(State::Txs, Message::ReplyTxs(_)) => Ok(Self {
state: State::Idle,
..self
}),
_ => panic!(),
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Idle, Message::RequestTxIds(block, ack, req)) if !block => {
self.on_tx_ids_request(ack as usize, req as usize)
}
(State::Idle, Message::RequestTxIds(block, _, _)) if block => Ok(Self {
state: State::TxIdsBlocking,
..self
}),
(State::Idle, Message::RequestTxs(ids)) => self.on_txs_request(ids),
(state, msg) => Err(MachineError::invalid_msg::<Self>(state, &msg)),
}
}
}
pub use client::*;
pub use codec::*;
pub use protocol::*;
pub use server::*;

View file

@ -0,0 +1,71 @@
use pallas_multiplexer::agents::ChannelError;
use thiserror::Error;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
Init,
Idle,
TxIdsNonBlocking,
TxIdsBlocking,
Txs,
Done,
}
pub type Blocking = bool;
pub type TxCount = u16;
pub type TxSizeInBytes = u32;
pub type TxId = Vec<u8>;
#[derive(Debug)]
pub struct TxIdAndSize(pub TxId, pub TxSizeInBytes);
pub type TxBody = Vec<u8>;
#[derive(Debug, Clone)]
pub struct Tx(pub TxId, pub TxBody);
impl From<Tx> for TxIdAndSize {
fn from(other: Tx) -> Self {
TxIdAndSize(other.0, other.1.len() as u32)
}
}
impl From<TxIdAndSize> for TxId {
fn from(value: TxIdAndSize) -> Self {
value.0
}
}
#[derive(Error, Debug)]
pub enum Error {
#[error("attempted to receive message while agency is ours")]
AgencyIsOurs,
#[error("attempted to send message while agency is theirs")]
AgencyIsTheirs,
#[error("inbound message is not valid for current state")]
InvalidInbound,
#[error("outbound message is not valid for current state")]
InvalidOutbound,
#[error("protocol is already initialized, no need to wait for init message")]
AlreadyInitialized,
#[error("error while sending or receiving data through the channel")]
ChannelError(ChannelError),
}
#[derive(Debug)]
pub enum Message {
Init,
RequestTxIds(Blocking, TxCount, TxCount),
ReplyTxIds(Vec<TxIdAndSize>),
RequestTxs(Vec<TxId>),
ReplyTxs(Vec<TxBody>),
Done,
}

View file

@ -0,0 +1,146 @@
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer};
use super::protocol::{Blocking, Error, Message, State, TxBody, TxCount, TxId, TxIdAndSize};
pub enum Reply {
TxIds(Vec<TxIdAndSize>),
Txs(Vec<TxBody>),
Done,
}
pub struct Server<H>(State, ChannelBuffer<H>)
where
H: Channel,
Message: Fragment;
impl<H> Server<H>
where
H: Channel,
Message: Fragment,
{
pub fn new(channel: H) -> Self {
Self(State::Init, ChannelBuffer::new(channel))
}
pub fn state(&self) -> &State {
&self.0
}
pub fn is_done(&self) -> bool {
self.0 == State::Done
}
// NOTE(pi): as of this writing, the network spec has a typo; this is the correct behavior
fn has_agency(&self) -> bool {
matches!(self.state(), State::Idle)
}
fn assert_agency_is_ours(&self) -> Result<(), Error> {
if !self.has_agency() {
Err(Error::AgencyIsTheirs)
} else {
Ok(())
}
}
fn assert_agency_is_theirs(&self) -> Result<(), Error> {
if self.has_agency() {
Err(Error::AgencyIsOurs)
} else {
Ok(())
}
}
/// As a server in a specific state, am I allowed to send this message?
fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Idle, Message::RequestTxIds(..)) => Ok(()),
(State::Idle, Message::RequestTxs(..)) => Ok(()),
_ => Err(Error::InvalidInbound),
}
}
/// As a server in a specific state, am I allowed to receive this message?
fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Init, Message::Init) => Ok(()),
(State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()),
(State::TxIdsBlocking, Message::Done) => Ok(()),
(State::TxIdsNonBlocking, Message::ReplyTxIds(..)) => Ok(()),
(State::Txs, Message::ReplyTxs(..)) => Ok(()),
_ => Err(Error::InvalidOutbound),
}
}
pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?;
Ok(())
}
pub fn recv_message(&mut self) -> Result<Message, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
}
pub fn wait_for_init(&mut self) -> Result<(), Error> {
if self.0 != State::Init {
return Err(Error::AlreadyInitialized);
}
// recv_message calls assert_inbound_state, which ensures we get an init message
self.recv_message()?;
self.0 = State::Idle;
Ok(())
}
pub fn acknowledge_and_request_tx_ids(
&mut self,
blocking: Blocking,
acknowledge: TxCount,
count: TxCount,
) -> Result<(), Error> {
let msg = Message::RequestTxIds(blocking, acknowledge, count);
self.send_message(&msg)?;
match blocking {
true => self.0 = State::TxIdsBlocking,
false => self.0 = State::TxIdsNonBlocking,
}
Ok(())
}
pub fn request_txs(&mut self, ids: Vec<TxId>) -> Result<(), Error> {
let msg = Message::RequestTxs(ids);
self.send_message(&msg)?;
self.0 = State::Txs;
Ok(())
}
pub fn receive_next_reply(&mut self) -> Result<Reply, Error> {
match self.recv_message()? {
Message::ReplyTxIds(ids_and_sizes) => {
self.0 = State::Idle;
Ok(Reply::TxIds(ids_and_sizes))
}
Message::ReplyTxs(bodies) => {
self.0 = State::Txs;
Ok(Reply::Txs(bodies))
}
Message::Done => {
self.0 = State::Done;
Ok(Reply::Done)
}
_ => Err(Error::InvalidInbound),
}
}
}

View file

@ -2,6 +2,7 @@ use pallas_miniprotocols::{
blockfetch,
chainsync::{self, NextResponse},
handshake::{self, Confirmation},
txsubmission::{self, Reply},
Point,
};
use pallas_multiplexer::{bearers::Bearer, StdChannel, StdPlexer};
@ -9,15 +10,17 @@ use pallas_multiplexer::{bearers::Bearer, StdChannel, StdPlexer};
struct N2NChannels {
channel2: StdChannel,
channel3: StdChannel,
channel4: StdChannel,
}
fn setup_n2n_connection() -> N2NChannels {
fn setup_n2n_client_connection() -> N2NChannels {
let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap();
let mut plexer = StdPlexer::new(bearer);
let channel0 = plexer.use_channel(0);
let channel2 = plexer.use_channel(2);
let channel3 = plexer.use_channel(3);
let channel4 = plexer.use_channel(4);
plexer.muxer.spawn();
plexer.demuxer.spawn();
@ -34,13 +37,17 @@ fn setup_n2n_connection() -> N2NChannels {
assert!(v >= 7);
}
N2NChannels { channel2, channel3 }
N2NChannels {
channel2,
channel3,
channel4,
}
}
#[test]
#[ignore]
pub fn chainsync_history_happy_path() {
let N2NChannels { channel2, .. } = setup_n2n_connection();
let N2NChannels { channel2, .. } = setup_n2n_client_connection();
let known_point = Point::Specific(
1654413,
@ -86,7 +93,7 @@ pub fn chainsync_history_happy_path() {
#[test]
#[ignore]
pub fn chainsync_tip_happy_path() {
let N2NChannels { channel2, .. } = setup_n2n_connection();
let N2NChannels { channel2, .. } = setup_n2n_client_connection();
let mut client = chainsync::N2NClient::new(channel2);
@ -125,7 +132,7 @@ pub fn chainsync_tip_happy_path() {
#[test]
#[ignore]
pub fn blockfetch_happy_path() {
let N2NChannels { channel3, .. } = setup_n2n_connection();
let N2NChannels { channel3, .. } = setup_n2n_client_connection();
let known_point = Point::Specific(
1654413,
@ -159,3 +166,61 @@ pub fn blockfetch_happy_path() {
assert!(matches!(client.state(), blockfetch::State::Done));
}
#[test]
#[ignore]
pub fn txsubmission_server_happy_path() {
// TODO(pi): Note that the below doesn't work; we need a node to connect *to us* during the integration test
// which seems awkward;
// Alternatively, we can just set up both a client and server connecting to themselves for testing!
let N2NChannels { channel4, .. } = setup_n2n_client_connection();
let mut server = txsubmission::Server::new(channel4);
assert!(matches!(server.wait_for_init(), Ok(_)));
assert!(matches!(
server.acknowledge_and_request_tx_ids(false, 0, 3),
Ok(_)
));
let reply = server.receive_next_reply();
assert!(matches!(reply, Ok(Reply::TxIds(_))));
let Ok(Reply::TxIds(tx_ids)) = reply else { unreachable!() };
assert!(tx_ids.len() <= 3);
assert!(matches!(
server.request_txs(tx_ids.into_iter().map(Into::into).collect()),
Ok(_)
));
let reply = server.receive_next_reply();
assert!(matches!(reply, Ok(Reply::Txs(_))));
let Ok(Reply::Txs(first_txs)) = reply else { unreachable!() };
assert!(matches!(
server.acknowledge_and_request_tx_ids(false, 1, 3),
Ok(_)
));
let reply = server.receive_next_reply();
assert!(matches!(reply, Ok(Reply::Txs(_))));
let Ok(Reply::Txs(second_txs)) = reply else { unreachable!() };
// Make sure we receive the second and third tx again, indicating we sent the `acknowledge 1` bit correctly
assert_eq!(second_txs[0], first_txs[1]);
assert_eq!(second_txs[1], first_txs[2]);
assert!(matches!(
server.acknowledge_and_request_tx_ids(true, 3, 3),
Ok(_)
));
match server.receive_next_reply() {
Ok(Reply::Done) => return, // Server aint havin none of our sh*t
Ok(Reply::TxIds(tx_ids)) => assert_eq!(tx_ids.len(), 3),
Ok(_) | Err(_) => assert!(false),
}
}

View file

@ -64,7 +64,7 @@ thread::spawn(move || {
// whatever get received for this mini-protocol.
loop {
let payload = rx.recv().unwrap();
println!("id:{}, length:{}", protocol, payload.len());
println!("id:{protocol}, length:{}", payload.len());
}
});
```

View file

@ -96,10 +96,10 @@ mod tests {
for (idx, (block_str, jsonl_str)) in test_blocks.iter().enumerate() {
println!("decoding json block {}", idx + 1);
let bytes = hex::decode(block_str).expect(&format!("bad block file {}", idx));
let bytes = hex::decode(block_str).expect(&format!("bad block file {idx}"));
let (_, block): BlockWrapper = minicbor::decode(&bytes[..])
.expect(&format!("error decoding cbor for file {}", idx));
let (_, block): BlockWrapper =
minicbor::decode(&bytes[..]).expect(&format!("error decoding cbor for file {idx}"));
let mut datums = jsonl_str.lines();
@ -126,10 +126,10 @@ mod tests {
for (idx, (block_str, jsonl_str)) in test_blocks.iter().enumerate() {
println!("decoding json block {}", idx + 1);
let bytes = hex::decode(block_str).expect(&format!("bad block file {}", idx));
let bytes = hex::decode(block_str).expect(&format!("bad block file {idx}"));
let (_, block): BlockWrapper = minicbor::decode(&bytes[..])
.expect(&format!("error decoding cbor for file {}", idx));
let (_, block): BlockWrapper =
minicbor::decode(&bytes[..]).expect(&format!("error decoding cbor for file {idx}"));
let mut scripts = jsonl_str.lines();

View file

@ -1618,13 +1618,12 @@ mod tests {
for (idx, block_str) in test_blocks.iter().enumerate() {
println!("decoding test block {}", idx + 1);
let bytes = hex::decode(block_str).expect(&format!("bad block file {}", idx));
let bytes = hex::decode(block_str).expect(&format!("bad block file {idx}"));
let block: BlockWrapper = minicbor::decode(&bytes[..])
.expect(&format!("error decoding cbor for file {}", idx));
let block: BlockWrapper =
minicbor::decode(&bytes[..]).expect(&format!("error decoding cbor for file {idx}"));
let bytes2 =
to_vec(block).expect(&format!("error encoding block cbor for file {}", idx));
let bytes2 = to_vec(block).expect(&format!("error encoding block cbor for file {idx}"));
assert!(bytes.eq(&bytes2), "re-encoded bytes didn't match original");
}
@ -1639,13 +1638,13 @@ mod tests {
for (idx, header_str) in test_headers.iter().enumerate() {
println!("decoding test header {}", idx + 1);
let bytes = hex::decode(header_str).expect(&format!("bad header file {}", idx));
let bytes = hex::decode(header_str).expect(&format!("bad header file {idx}"));
let header: Header = minicbor::decode(&bytes[..])
.expect(&format!("error decoding cbor for file {}", idx));
let header: Header =
minicbor::decode(&bytes[..]).expect(&format!("error decoding cbor for file {idx}"));
let bytes2 =
to_vec(header).expect(&format!("error encoding header cbor for file {}", idx));
to_vec(header).expect(&format!("error encoding header cbor for file {idx}"));
assert!(bytes.eq(&bytes2), "re-encoded bytes didn't match original");
}

View file

@ -764,13 +764,13 @@ mod tests {
for (idx, block_str) in test_blocks.iter().enumerate() {
println!("decoding test block {}", idx + 1);
let bytes = hex::decode(block_str).expect(&format!("bad block file {}", idx));
let bytes = hex::decode(block_str).expect(&format!("bad block file {idx}"));
let block: BlockWrapper = minicbor::decode(&bytes[..])
.expect(&format!("error decoding cbor for file {}", idx));
let block: BlockWrapper =
minicbor::decode(&bytes[..]).expect(&format!("error decoding cbor for file {idx}"));
let bytes2 = minicbor::to_vec(block)
.expect(&format!("error encoding block cbor for file {}", idx));
.expect(&format!("error encoding block cbor for file {idx}"));
assert!(bytes.eq(&bytes2), "re-encoded bytes didn't match original");
}

View file

@ -833,13 +833,12 @@ mod tests {
for (idx, block_str) in test_blocks.iter().enumerate() {
println!("decoding test block {}", idx + 1);
let bytes = hex::decode(block_str).expect(&format!("bad block file {}", idx));
let bytes = hex::decode(block_str).expect(&format!("bad block file {idx}"));
let block: BlockWrapper = minicbor::decode(&bytes[..])
.expect(&format!("error decoding cbor for file {}", idx));
let block: BlockWrapper =
minicbor::decode(&bytes[..]).expect(&format!("error decoding cbor for file {idx}"));
let bytes2 =
to_vec(block).expect(&format!("error encoding block cbor for file {}", idx));
let bytes2 = to_vec(block).expect(&format!("error encoding block cbor for file {idx}"));
assert_eq!(hex::encode(bytes), hex::encode(bytes2));
}
@ -862,13 +861,12 @@ mod tests {
for (idx, block_str) in test_blocks.iter().enumerate() {
println!("decoding test block {}", idx + 1);
let bytes = hex::decode(block_str).expect(&format!("bad block file {}", idx));
let bytes = hex::decode(block_str).expect(&format!("bad block file {idx}"));
let block: BlockWrapper = minicbor::decode(&bytes[..])
.expect(&format!("error decoding cbor for file {}", idx));
let block: BlockWrapper =
minicbor::decode(&bytes[..]).expect(&format!("error decoding cbor for file {idx}"));
let bytes2 =
to_vec(block).expect(&format!("error encoding block cbor for file {}", idx));
let bytes2 = to_vec(block).expect(&format!("error encoding block cbor for file {idx}"));
assert_eq!(hex::encode(bytes), hex::encode(bytes2));
}
@ -880,13 +878,13 @@ mod tests {
for (idx, str) in subjects.iter().enumerate() {
println!("decoding test header {}", idx + 1);
let bytes = hex::decode(str).expect(&format!("bad header file {}", idx));
let bytes = hex::decode(str).expect(&format!("bad header file {idx}"));
let block: BlockHead = minicbor::decode(&bytes[..])
.expect(&format!("error decoding cbor for file {}", idx));
let bytes2 =
to_vec(block).expect(&format!("error encoding header cbor for file {}", idx));
to_vec(block).expect(&format!("error encoding header cbor for file {idx}"));
assert_eq!(bytes, bytes2);
}

View file

@ -47,7 +47,7 @@ mod tests {
let block_idx = 1;
let block_str = include_str!("../../test_data/byron4.block");
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {}", block_idx));
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {block_idx}"));
let block = crate::MultiEraBlock::decode_byron(&block_bytes).unwrap();
let txs = block.txs();

View file

@ -162,9 +162,9 @@ mod tests {
let block_idx = 1;
let block_str = include_str!("../../test_data/byron1.block");
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {}", block_idx));
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {block_idx}"));
let (_, block_model): BlockWrapper = minicbor::decode(&block_bytes[..])
.expect(&format!("error decoding cbor for file {}", block_idx));
.expect(&format!("error decoding cbor for file {block_idx}"));
let computed_hash = block_model.header.original_hash();
@ -182,9 +182,9 @@ mod tests {
let block_idx = 1;
let block_str = include_str!("../../test_data/alonzo1.block");
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {}", block_idx));
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {block_idx}"));
let (_, block_model): BlockWrapper = minicbor::decode(&block_bytes[..])
.expect(&format!("error decoding cbor for file {}", block_idx));
.expect(&format!("error decoding cbor for file {block_idx}"));
let valid_hashes = vec![
"8ae0cd531635579a9b52b954a840782d12235251fb1451e5c699e864c677514a",
@ -209,9 +209,9 @@ mod tests {
let block_idx = 1;
let block_str = include_str!("../../test_data/babbage1.block");
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {}", block_idx));
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {block_idx}"));
let (_, block_model): BlockWrapper = minicbor::decode(&block_bytes[..])
.expect(&format!("error decoding cbor for file {}", block_idx));
.expect(&format!("error decoding cbor for file {block_idx}"));
let valid_hashes = vec!["3fad302595665b004971a6b76909854a39a0a7ecdbff3692f37b77ae37dbe882"];

View file

@ -20,9 +20,9 @@ mod tests {
let block_idx = 1;
let block_str = include_str!("../../test_data/byron1.block");
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {}", block_idx));
let block_bytes = hex::decode(block_str).expect(&format!("bad block file {block_idx}"));
let (_, block): BlockWrapper = minicbor::decode(&block_bytes[..])
.expect(&format!("error decoding cbor for file {}", block_idx));
.expect(&format!("error decoding cbor for file {block_idx}"));
let computed_slot = byron_epoch_slot_to_absolute(
block.header.consensus_data.0.epoch,