From c3c7f818ce6bddfd551da64d59a98df70ee05177 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Mon, 22 Nov 2021 13:19:33 -0300 Subject: [PATCH] Add basic tx-submission mini-protocol implementation --- Cargo.toml | 1 + pallas-txsubmission/.gitignore | 2 + pallas-txsubmission/Cargo.toml | 25 +++ pallas-txsubmission/examples/naive.rs | 33 +++ pallas-txsubmission/src/lib.rs | 300 ++++++++++++++++++++++++++ pallas/Cargo.toml | 3 +- pallas/src/ouroboros.rs | 3 + 7 files changed, 366 insertions(+), 1 deletion(-) create mode 100644 pallas-txsubmission/.gitignore create mode 100644 pallas-txsubmission/Cargo.toml create mode 100644 pallas-txsubmission/examples/naive.rs create mode 100644 pallas-txsubmission/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 7d55aeb..e6ccc6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,5 +6,6 @@ members = [ "pallas-handshake", "pallas-blockfetch", "pallas-chainsync", + "pallas-txsubmission", "pallas", ] \ No newline at end of file diff --git a/pallas-txsubmission/.gitignore b/pallas-txsubmission/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/pallas-txsubmission/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/pallas-txsubmission/Cargo.toml b/pallas-txsubmission/Cargo.toml new file mode 100644 index 0000000..ecbefae --- /dev/null +++ b/pallas-txsubmission/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "pallas-txsubmission" +version = "0.1.0" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +license = "Apache-2.0" +authors = [ + "Santiago Carmuega " +] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pallas-multiplexer = { path = "../pallas-multiplexer/" } +pallas-machines = { path = "../pallas-machines/" } +minicbor = { version="0.11.4", features=["half"] } +minicbor-io = "0.6.0" +log = "0.4.14" +hex = "0.4.3" +itertools = "0.10.1" + +[dev-dependencies] +net2 = "0.2.37" +env_logger = "0.9.0" +pallas-handshake = { path = "../pallas-handshake/" } diff --git a/pallas-txsubmission/examples/naive.rs b/pallas-txsubmission/examples/naive.rs new file mode 100644 index 0000000..806e836 --- /dev/null +++ b/pallas-txsubmission/examples/naive.rs @@ -0,0 +1,33 @@ +use net2::TcpStreamExt; +use std::net::TcpStream; + +use pallas_txsubmission::{NaiveProvider}; +use pallas_handshake::n2c::{Client, VersionTable}; +use pallas_handshake::MAINNET_MAGIC; +use pallas_machines::run_agent; +use pallas_multiplexer::Multiplexer; + +fn main() { + env_logger::init(); + + //let bearer = TcpStream::connect("localhost:6000").unwrap(); + let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + + bearer.set_nodelay(true).unwrap(); + bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); + + let mut handles = Multiplexer::new(bearer, &vec![0, 4]).unwrap(); + let (_, rx, tx) = handles.remove(0); + + let versions = VersionTable::v1_and_above(MAINNET_MAGIC); + let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + println!("{:?}", last); + + + let (_, ts_rx, ts_tx) = handles.remove(0); + + let ts = NaiveProvider::initial(vec![]); + let ts = run_agent(ts, ts_rx, &ts_tx).unwrap(); + + println!("{:?}", ts); +} diff --git a/pallas-txsubmission/src/lib.rs b/pallas-txsubmission/src/lib.rs new file mode 100644 index 0000000..b12824e --- /dev/null +++ b/pallas-txsubmission/src/lib.rs @@ -0,0 +1,300 @@ +use std::fmt::Debug; + +use itertools::Itertools; +use log::debug; + +use pallas_machines::{ + Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, + PayloadEncoder, Transition, +}; + +#[derive(Debug, PartialEq, Clone)] +pub enum State { + Idle, + TxIdsNonBlocking, + TxIdsBlocking, + Txs, + Done, +} + +pub type Blocking = bool; + +pub type TxCount = u16; + +pub type TxSizeInBytes = u32; + +pub type TxId = u64; + +#[derive(Debug)] +pub struct TxIdAndSize(TxId, TxSizeInBytes); + +impl EncodePayload for TxIdAndSize { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + e.array(2)?; + e.u64(self.0)?; + e.u32(self.1)?; + + Ok(()) + } +} + +impl DecodePayload for TxIdAndSize { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let id = d.u64()?; + let size = d.u32()?; + + Ok(Self(id, size)) + } +} + +pub type TxBody = Vec; + +#[derive(Debug, Clone)] +pub struct Tx(TxId, TxBody); + +impl Into for &Tx { + fn into(self) -> TxIdAndSize { + TxIdAndSize(self.0, self.1.len() as u32) + } +} + +#[derive(Debug)] +pub enum Message { + RequestTxIds(Blocking, TxCount, TxCount), + ReplyTxIds(Vec), + RequestTxs(Vec), + ReplyTxs(Vec), + Done, +} + +impl EncodePayload for Message { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + match self { + Message::RequestTxIds(blocking, ack, req) => { + e.array(4)?.u16(0)?; + e.bool(*blocking)?; + e.u16(*ack)?; + e.u16(*req)?; + Ok(()) + } + Message::ReplyTxIds(ids) => { + e.array(2)?.u16(1)?; + e.array(ids.len() as u64)?; + for id in ids { + id.encode_payload(e)?; + } + Ok(()) + } + Message::RequestTxs(ids) => { + e.array(2)?.u16(2)?; + e.array(ids.len() as u64)?; + for id in ids { + e.u64(*id)?; + } + Ok(()) + } + Message::ReplyTxs(txs) => { + e.array(2)?.u16(3)?; + e.array(txs.len() as u64)?; + for tx in txs { + e.bytes(tx)?; + } + Ok(()) + } + Message::Done => { + e.array(1)?.u16(4)?; + Ok(()) + } + } + } +} + +impl DecodePayload for Message { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let label = d.u16()?; + + match label { + 0 => { + let blocking = d.bool()?; + let ack = d.u16()?; + let req = d.u16()?; + Ok(Message::RequestTxIds(blocking, ack, req)) + } + 1 => { + let items = Vec::::decode_payload(d)?; + Ok(Message::ReplyTxIds(items)) + } + 2 => { + let ids = d.array_iter::()?.try_collect()?; + Ok(Message::RequestTxs(ids)) + } + 3 => { + todo!() + } + 4 => Ok(Message::Done), + x => Err(Box::new(MachineError::BadLabel(x))), + } + } +} + +/// A very basic tx provider agent with a fixed set of tx to submit +/// +/// This provider takes a set of tx from a vec as the single, static source of +/// data to transfer to the consumer. It's main use is for implementing peers +/// that need to answer to v1 implementations of the Tx-Submission +/// mini-protocol. Since v1 nodes dont' wait for a 'Hello' message, the peer +/// needs to be prepared to receive Tx requests. This naive provider serves as a +/// good placeholder for those scenarios. +#[derive(Debug)] +pub struct NaiveProvider { + pub state: State, + pub fifo_txs: Vec, + pub acknowledged_count: usize, + pub requested_ids_count: usize, + pub requested_txs: Option>, +} + +impl NaiveProvider { + pub fn initial(fifo_txs: Vec) -> Self { + Self { + state: State::Idle, + acknowledged_count: 0, + requested_ids_count: 0, + requested_txs: None, + fifo_txs, + } + } + + fn send_done(self, tx: &impl MachineOutput) -> Transition { + let msg = Message::Done; + + tx.send_msg(&msg)?; + + Ok(Self { + state: State::Done, + ..self + }) + } + + fn send_tx_ids(mut self, tx: &impl MachineOutput) -> Transition { + debug!("draining {} from tx fifo queue", self.acknowledged_count); + self.fifo_txs.drain(0..(self.acknowledged_count - 1)); + + debug!( + "sending next {} tx ids from fifo queue", + self.requested_ids_count + ); + let to_send = self.fifo_txs[0..self.requested_ids_count] + .iter() + .map_into() + .collect_vec(); + + let msg = Message::ReplyTxIds(to_send); + tx.send_msg(&msg)?; + + Ok(Self { + state: State::Idle, + acknowledged_count: 0, + requested_ids_count: 0, + ..self + }) + } + + fn send_txs(self, tx: &impl MachineOutput) -> Transition { + let matches = self + .fifo_txs + .iter() + .filter(|Tx(candidate_id, _)| match &self.requested_txs { + Some(requested) => requested.iter().contains(candidate_id), + None => false, + }) + .map(|Tx(_, body)| body.clone()) + .collect_vec(); + + let msg = Message::ReplyTxs(matches); + tx.send_msg(&msg)?; + + Ok(Self { + state: State::Idle, + requested_txs: None, + ..self + }) + } + + fn on_tx_ids_request( + self, + acknowledged_count: usize, + requested_ids_count: usize, + ) -> Transition { + debug!( + "new tx id request {} (ack: {})", + requested_ids_count, acknowledged_count + ); + + Ok(Self { + state: State::Idle, + requested_ids_count, + acknowledged_count, + ..self + }) + } + + fn on_txs_request( + self, + requested_txs: Vec, + ) -> Transition { + debug!( + "new txs request {:?}", + requested_txs, + ); + + Ok(Self { + state: State::Idle, + requested_txs: Some(requested_txs), + ..self + }) + } +} + +impl Agent for NaiveProvider { + type Message = Message; + + fn is_done(&self) -> bool { + self.state == State::Done + } + + fn has_agency(&self) -> bool { + match self.state { + State::Idle => false, + State::TxIdsNonBlocking => true, + State::TxIdsBlocking => true, + State::Txs => true, + State::Done => false, + } + } + + fn send_next(self, tx: &impl MachineOutput) -> Transition { + match self.state { + State::TxIdsBlocking => self.send_done(tx), + State::TxIdsNonBlocking => self.send_tx_ids(tx), + State::Txs => self.send_txs(tx), + _ => panic!("I don't have agency, don't know what to do"), + } + } + + fn receive_next(self, msg: Self::Message) -> Transition { + match (&self.state, msg) { + (State::Idle, Message::RequestTxIds(block, ack, req)) if !block => { + self.on_tx_ids_request(ack as usize, req as usize) + } + (State::Idle, Message::RequestTxIds(block, _, _)) if block => Ok(Self { + state: State::TxIdsBlocking, + ..self + }), + (State::Idle, Message::RequestTxs(ids)) => self.on_txs_request(ids), + _ => Err(Box::new(MachineError::InvalidMsgForState)), + } + } +} diff --git a/pallas/Cargo.toml b/pallas/Cargo.toml index aed51b7..39f3656 100644 --- a/pallas/Cargo.toml +++ b/pallas/Cargo.toml @@ -14,4 +14,5 @@ authors = [ pallas-multiplexer = { path = "../pallas-multiplexer/" } pallas-machines = { path = "../pallas-machines/" } pallas-handshake = { path = "../pallas-handshake/" } -pallas-blockfetch = { path = "../pallas-blockfetch/" } \ No newline at end of file +pallas-blockfetch = { path = "../pallas-blockfetch/" } +pallas-txsubmission = { path = "../pallas-txsubmission/" } \ No newline at end of file diff --git a/pallas/src/ouroboros.rs b/pallas/src/ouroboros.rs index 739e843..38e887e 100644 --- a/pallas/src/ouroboros.rs +++ b/pallas/src/ouroboros.rs @@ -10,3 +10,6 @@ pub use pallas_handshake as handshake; #[doc(inline)] pub use pallas_blockfetch as blockfetch; + +#[doc(inline)] +pub use pallas_txsubmission as txsubmission;