Add basic tx-submission mini-protocol implementation

This commit is contained in:
Santiago Carmuega 2021-11-22 13:19:33 -03:00
parent 83ddff3f7e
commit c3c7f818ce
7 changed files with 366 additions and 1 deletions

View file

@ -6,5 +6,6 @@ members = [
"pallas-handshake",
"pallas-blockfetch",
"pallas-chainsync",
"pallas-txsubmission",
"pallas",
]

2
pallas-txsubmission/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

View file

@ -0,0 +1,25 @@
[package]
name = "pallas-txsubmission"
version = "0.1.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
license = "Apache-2.0"
authors = [
"Santiago Carmuega <santiago@carmuega.me>"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pallas-multiplexer = { path = "../pallas-multiplexer/" }
pallas-machines = { path = "../pallas-machines/" }
minicbor = { version="0.11.4", features=["half"] }
minicbor-io = "0.6.0"
log = "0.4.14"
hex = "0.4.3"
itertools = "0.10.1"
[dev-dependencies]
net2 = "0.2.37"
env_logger = "0.9.0"
pallas-handshake = { path = "../pallas-handshake/" }

View file

@ -0,0 +1,33 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_txsubmission::{NaiveProvider};
use pallas_handshake::n2c::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_machines::run_agent;
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0, 4]).unwrap();
let (_, rx, tx) = handles.remove(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
println!("{:?}", last);
let (_, ts_rx, ts_tx) = handles.remove(0);
let ts = NaiveProvider::initial(vec![]);
let ts = run_agent(ts, ts_rx, &ts_tx).unwrap();
println!("{:?}", ts);
}

View file

@ -0,0 +1,300 @@
use std::fmt::Debug;
use itertools::Itertools;
use log::debug;
use pallas_machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
PayloadEncoder, Transition,
};
#[derive(Debug, PartialEq, Clone)]
pub enum State {
Idle,
TxIdsNonBlocking,
TxIdsBlocking,
Txs,
Done,
}
pub type Blocking = bool;
pub type TxCount = u16;
pub type TxSizeInBytes = u32;
pub type TxId = u64;
#[derive(Debug)]
pub struct TxIdAndSize(TxId, TxSizeInBytes);
impl EncodePayload for TxIdAndSize {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
e.array(2)?;
e.u64(self.0)?;
e.u32(self.1)?;
Ok(())
}
}
impl DecodePayload for TxIdAndSize {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let id = d.u64()?;
let size = d.u32()?;
Ok(Self(id, size))
}
}
pub type TxBody = Vec<u8>;
#[derive(Debug, Clone)]
pub struct Tx(TxId, TxBody);
impl Into<TxIdAndSize> for &Tx {
fn into(self) -> TxIdAndSize {
TxIdAndSize(self.0, self.1.len() as u32)
}
}
#[derive(Debug)]
pub enum Message {
RequestTxIds(Blocking, TxCount, TxCount),
ReplyTxIds(Vec<TxIdAndSize>),
RequestTxs(Vec<TxId>),
ReplyTxs(Vec<TxBody>),
Done,
}
impl EncodePayload for Message {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
match self {
Message::RequestTxIds(blocking, ack, req) => {
e.array(4)?.u16(0)?;
e.bool(*blocking)?;
e.u16(*ack)?;
e.u16(*req)?;
Ok(())
}
Message::ReplyTxIds(ids) => {
e.array(2)?.u16(1)?;
e.array(ids.len() as u64)?;
for id in ids {
id.encode_payload(e)?;
}
Ok(())
}
Message::RequestTxs(ids) => {
e.array(2)?.u16(2)?;
e.array(ids.len() as u64)?;
for id in ids {
e.u64(*id)?;
}
Ok(())
}
Message::ReplyTxs(txs) => {
e.array(2)?.u16(3)?;
e.array(txs.len() as u64)?;
for tx in txs {
e.bytes(tx)?;
}
Ok(())
}
Message::Done => {
e.array(1)?.u16(4)?;
Ok(())
}
}
}
}
impl DecodePayload for Message {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let label = d.u16()?;
match label {
0 => {
let blocking = d.bool()?;
let ack = d.u16()?;
let req = d.u16()?;
Ok(Message::RequestTxIds(blocking, ack, req))
}
1 => {
let items = Vec::<TxIdAndSize>::decode_payload(d)?;
Ok(Message::ReplyTxIds(items))
}
2 => {
let ids = d.array_iter::<TxId>()?.try_collect()?;
Ok(Message::RequestTxs(ids))
}
3 => {
todo!()
}
4 => Ok(Message::Done),
x => Err(Box::new(MachineError::BadLabel(x))),
}
}
}
/// A very basic tx provider agent with a fixed set of tx to submit
///
/// This provider takes a set of tx from a vec as the single, static source of
/// data to transfer to the consumer. It's main use is for implementing peers
/// that need to answer to v1 implementations of the Tx-Submission
/// mini-protocol. Since v1 nodes dont' wait for a 'Hello' message, the peer
/// needs to be prepared to receive Tx requests. This naive provider serves as a
/// good placeholder for those scenarios.
#[derive(Debug)]
pub struct NaiveProvider {
pub state: State,
pub fifo_txs: Vec<Tx>,
pub acknowledged_count: usize,
pub requested_ids_count: usize,
pub requested_txs: Option<Vec<TxId>>,
}
impl NaiveProvider {
pub fn initial(fifo_txs: Vec<Tx>) -> Self {
Self {
state: State::Idle,
acknowledged_count: 0,
requested_ids_count: 0,
requested_txs: None,
fifo_txs,
}
}
fn send_done(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::Done;
tx.send_msg(&msg)?;
Ok(Self {
state: State::Done,
..self
})
}
fn send_tx_ids(mut self, tx: &impl MachineOutput) -> Transition<Self> {
debug!("draining {} from tx fifo queue", self.acknowledged_count);
self.fifo_txs.drain(0..(self.acknowledged_count - 1));
debug!(
"sending next {} tx ids from fifo queue",
self.requested_ids_count
);
let to_send = self.fifo_txs[0..self.requested_ids_count]
.iter()
.map_into()
.collect_vec();
let msg = Message::ReplyTxIds(to_send);
tx.send_msg(&msg)?;
Ok(Self {
state: State::Idle,
acknowledged_count: 0,
requested_ids_count: 0,
..self
})
}
fn send_txs(self, tx: &impl MachineOutput) -> Transition<Self> {
let matches = self
.fifo_txs
.iter()
.filter(|Tx(candidate_id, _)| match &self.requested_txs {
Some(requested) => requested.iter().contains(candidate_id),
None => false,
})
.map(|Tx(_, body)| body.clone())
.collect_vec();
let msg = Message::ReplyTxs(matches);
tx.send_msg(&msg)?;
Ok(Self {
state: State::Idle,
requested_txs: None,
..self
})
}
fn on_tx_ids_request(
self,
acknowledged_count: usize,
requested_ids_count: usize,
) -> Transition<Self> {
debug!(
"new tx id request {} (ack: {})",
requested_ids_count, acknowledged_count
);
Ok(Self {
state: State::Idle,
requested_ids_count,
acknowledged_count,
..self
})
}
fn on_txs_request(
self,
requested_txs: Vec<TxId>,
) -> Transition<Self> {
debug!(
"new txs request {:?}",
requested_txs,
);
Ok(Self {
state: State::Idle,
requested_txs: Some(requested_txs),
..self
})
}
}
impl Agent for NaiveProvider {
type Message = Message;
fn is_done(&self) -> bool {
self.state == State::Done
}
fn has_agency(&self) -> bool {
match self.state {
State::Idle => false,
State::TxIdsNonBlocking => true,
State::TxIdsBlocking => true,
State::Txs => true,
State::Done => false,
}
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.state {
State::TxIdsBlocking => self.send_done(tx),
State::TxIdsNonBlocking => self.send_tx_ids(tx),
State::Txs => self.send_txs(tx),
_ => panic!("I don't have agency, don't know what to do"),
}
}
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Idle, Message::RequestTxIds(block, ack, req)) if !block => {
self.on_tx_ids_request(ack as usize, req as usize)
}
(State::Idle, Message::RequestTxIds(block, _, _)) if block => Ok(Self {
state: State::TxIdsBlocking,
..self
}),
(State::Idle, Message::RequestTxs(ids)) => self.on_txs_request(ids),
_ => Err(Box::new(MachineError::InvalidMsgForState)),
}
}
}

View file

@ -14,4 +14,5 @@ authors = [
pallas-multiplexer = { path = "../pallas-multiplexer/" }
pallas-machines = { path = "../pallas-machines/" }
pallas-handshake = { path = "../pallas-handshake/" }
pallas-blockfetch = { path = "../pallas-blockfetch/" }
pallas-blockfetch = { path = "../pallas-blockfetch/" }
pallas-txsubmission = { path = "../pallas-txsubmission/" }

View file

@ -10,3 +10,6 @@ pub use pallas_handshake as handshake;
#[doc(inline)]
pub use pallas_blockfetch as blockfetch;
#[doc(inline)]
pub use pallas_txsubmission as txsubmission;