feat(network): implement LocalTxSubmission client (#289)

This commit is contained in:
Ilya 2023-09-21 02:41:37 +03:00 committed by GitHub
parent 18428298a4
commit dea731d58b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 392 additions and 2 deletions

View file

@ -13,14 +13,14 @@ The following architectural decisions were made for this particular Rust impleme
## Development Status
| mini-protocol | initiator | responder |
| ------------------------------------------- | --------- | --------- |
| ------------------------------------------- |-----------| --------- |
| block-fetch | done | planned |
| chain-sync | done | planned |
| [handshake](src/handshake/README.md) | done | done |
| local-state | done | planned |
| [tx-submission](src/txsubmission/README.md) | done | done |
| local tx monitor | done | planned |
| local-tx-submission | ongoing | planned |
| local-tx-submission | done | planned |
## Implementation Details

View file

@ -0,0 +1,208 @@
use std::marker::PhantomData;
use thiserror::Error;
use tracing::debug;
use pallas_codec::Fragment;
use crate::miniprotocols::localtxsubmission::{EraTx, Message, RejectReason, State};
use crate::multiplexer;
/// Cardano specific instantiation of LocalTxSubmission client.
pub type Client = GenericClient<EraTx, RejectReason>;
/// A generic Ouroboros client for submitting a generic transaction
/// to a server, which possibly results in a generic rejection.
pub struct GenericClient<Tx, Reject> {
state: State,
muxer: multiplexer::ChannelBuffer,
pd_tx: PhantomData<Tx>,
pd_reject: PhantomData<Reject>,
}
impl<Tx, Reject> GenericClient<Tx, Reject>
where
Message<Tx, Reject>: Fragment,
{
/// Constructs a new LocalTxSubmission `Client` instance.
///
/// # Arguments
/// * `channel` - An instance of `multiplexer::AgentChannel` to be used for
/// communication.
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self {
state: State::Idle,
muxer: multiplexer::ChannelBuffer::new(channel),
pd_tx: Default::default(),
pd_reject: Default::default(),
}
}
/// Submits the given `tx` to the server.
///
/// # Arguments
/// * `tx` - transaction to submit.
///
/// # Errors
/// Returns an error if the agency is not ours or if the outbound state is
/// invalid.
pub async fn submit_tx(&mut self, tx: Tx) -> Result<(), Error<Reject>> {
self.send_submit_tx(tx).await?;
self.recv_submit_tx_response().await
}
/// Terminates the protocol gracefully.
///
/// # Errors
/// Returns an error if the agency is not ours or if the outbound state is
/// invalid.
pub async fn terminate_gracefully(&mut self) -> Result<(), Error<Reject>> {
let msg = Message::Done;
self.send_message(&msg).await?;
self.state = State::Done;
Ok(())
}
/// Returns the current state of the client.
fn state(&self) -> &State {
&self.state
}
/// Checks if the client has agency.
fn has_agency(&self) -> bool {
match self.state() {
State::Idle => true,
State::Busy | State::Done => false,
}
}
fn assert_agency_is_ours(&self) -> Result<(), Error<Reject>> {
if !self.has_agency() {
Err(Error::AgencyIsTheirs)
} else {
Ok(())
}
}
fn assert_agency_is_theirs(&self) -> Result<(), Error<Reject>> {
if self.has_agency() {
Err(Error::AgencyIsOurs)
} else {
Ok(())
}
}
fn assert_outbound_state(&self, msg: &Message<Tx, Reject>) -> Result<(), Error<Reject>> {
match (&self.state, msg) {
(State::Idle, Message::SubmitTx(_) | Message::Done) => Ok(()),
_ => Err(Error::InvalidOutbound),
}
}
fn assert_inbound_state(&self, msg: &Message<Tx, Reject>) -> Result<(), Error<Reject>> {
match (&self.state, msg) {
(State::Busy, Message::AcceptTx | Message::RejectTx(_)) => Ok(()),
_ => Err(Error::InvalidInbound),
}
}
/// Sends a message to the server
///
/// # Arguments
///
/// * `msg` - A reference to the `Message` to be sent.
///
/// # Errors
/// Returns an error if the agency is not ours or if the outbound state is
/// invalid.
async fn send_message(&mut self, msg: &Message<Tx, Reject>) -> Result<(), Error<Reject>> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.muxer
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;
Ok(())
}
/// Receives the next message from the server.
///
/// # Errors
/// Returns an error if the agency is not theirs or if the inbound state is
/// invalid.
async fn recv_message(&mut self) -> Result<Message<Tx, Reject>, Error<Reject>> {
self.assert_agency_is_theirs()?;
let msg = self
.muxer
.recv_full_msg()
.await
.map_err(Error::ChannelError)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
}
/// Sends SubmitTx message to the server.
///
/// # Arguments
/// * `tx` - transaction to submit.
///
/// # Errors
/// Returns an error if the agency is not ours or if the outbound state is
/// invalid.
async fn send_submit_tx(&mut self, tx: Tx) -> Result<(), Error<Reject>> {
let msg = Message::SubmitTx(tx);
self.send_message(&msg).await?;
self.state = State::Busy;
debug!("sent SubmitTx");
Ok(())
}
/// Receives SubmitTx response from the server.
///
/// # Errors
/// Returns an error if the inbound message is invalid.
async fn recv_submit_tx_response(&mut self) -> Result<(), Error<Reject>> {
debug!("waiting for SubmitTx response");
match self.recv_message().await? {
Message::AcceptTx => {
self.state = State::Idle;
Ok(())
}
Message::RejectTx(rejection) => {
self.state = State::Idle;
Err(Error::TxRejected(rejection))
}
_ => Err(Error::InvalidInbound),
}
}
}
#[derive(Error, Debug)]
pub enum Error<Reject> {
#[error("attempted to receive message while agency is ours")]
AgencyIsOurs,
#[error("attempted to send message while agency is theirs")]
AgencyIsTheirs,
#[error("inbound message is not valid for current state")]
InvalidInbound,
#[error("outbound message is not valid for current state")]
InvalidOutbound,
#[error("error while sending or receiving data through the channel")]
ChannelError(multiplexer::Error),
#[error("tx was rejected by the server")]
TxRejected(Reject),
}

View file

@ -0,0 +1,152 @@
use pallas_codec::minicbor::{decode, Decode, Decoder, encode, Encode, Encoder};
use pallas_codec::minicbor::data::Tag;
use crate::miniprotocols::localtxsubmission::{EraTx, Message, RejectReason};
impl<Tx, Reject> Encode<()> for Message<Tx, Reject>
where
Tx: Encode<()>,
Reject: Encode<()>,
{
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
Message::SubmitTx(tx) => {
e.array(2)?.u16(0)?;
e.encode(tx)?;
Ok(())
}
Message::AcceptTx => {
e.array(1)?.u16(1)?;
Ok(())
}
Message::RejectTx(rejection) => {
e.array(2)?.u16(2)?;
e.encode(rejection)?;
Ok(())
}
Message::Done => {
e.array(1)?.u16(3)?;
Ok(())
}
}
}
}
impl<'b, Tx: Decode<'b, ()>, Reject: Decode<'b, ()>> Decode<'b, ()> for Message<Tx, Reject> {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let label = d.u16()?;
match label {
0 => {
let tx = d.decode()?;
Ok(Message::SubmitTx(tx))
}
1 => Ok(Message::AcceptTx),
2 => {
let rejection = d.decode()?;
Ok(Message::RejectTx(rejection))
}
3 => Ok(Message::Done),
_ => Err(decode::Error::message("can't decode Message")),
}
}
}
impl<'b> Decode<'b, ()> for EraTx {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let era = d.u16()?;
let tag = d.tag()?;
if tag != Tag::Cbor {
return Err(decode::Error::message("Expected encoded CBOR data item"));
}
Ok(EraTx(era, d.bytes()?.to_vec()))
}
}
impl Encode<()> for EraTx {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
e.array(2)?;
e.u16(self.0)?;
e.tag(Tag::Cbor)?;
e.bytes(&self.1)?;
Ok(())
}
}
impl<'b> Decode<'b, ()> for RejectReason {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
let remainder = d.input().to_vec();
Ok(RejectReason(remainder))
}
}
impl Encode<()> for RejectReason {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
e.writer_mut()
.write_all(&self.0)
.map_err(|w_err| encode::Error::write(w_err))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use pallas_codec::{Fragment, minicbor};
use crate::miniprotocols::localtxsubmission::{EraTx, Message, RejectReason};
use crate::multiplexer::Error;
#[test]
fn decode_reject_message() {
let mut bytes = hex::decode(RAW_REJECT_RESPONSE).unwrap();
let msg_res = try_decode_message::<Message<EraTx, RejectReason>>(&mut bytes);
assert!(msg_res.is_ok())
}
fn try_decode_message<M>(buffer: &mut Vec<u8>) -> Result<Option<M>, Error>
where
M: Fragment,
{
let mut decoder = minicbor::Decoder::new(buffer);
let maybe_msg = decoder.decode();
match maybe_msg {
Ok(msg) => {
let pos = decoder.position();
buffer.drain(0..pos);
Ok(Some(msg))
}
Err(err) if err.is_end_of_input() => Ok(None),
Err(err) => Err(Error::Decoding(err.to_string())),
}
}
const RAW_REJECT_RESPONSE: &str =
"82028182059f820082018200820a81581c3b890fb5449baedf5342a48ee9c9ec6acbc995641be92ad21f08c686\
8200820183038158202628ce6ff8cc7ff0922072d930e4a693c17f991748dedece0be64819a2f9ef7782582031d\
54ce8d7e8cb262fc891282f44e9d24c3902dc38fac63fd469e8bf3006376b5820750852fdaf0f2dd724291ce007\
b8e76d74bcf28076ed0c494cd90c0cfe1c9ca582008201820782000000018200820183048158201a547638b4cf4\
a3cec386e2f898ac6bc987fadd04277e1d3c8dab5c505a5674e8158201457e4107607f83a80c3c4ffeb70910c2b\
a3a35cf1699a2a7375f50fcc54a931820082028201830500821a00636185a2581c6f1a1f0c7ccf632cc9ff4b796\
87ed13ffe5b624cce288b364ebdce50a144414749581b000000032a9f8800581c795ecedb09821cb922c13060c8\
f6377c3344fa7692551e865d86ac5da158205399c766fb7c494cddb2f7ae53cc01285474388757bc05bd575c14a\
713a432a901820082028201820085825820497fe6401e25733c073c01164c7f2a1a05de8c95e36580f9d1b05123\
70040def028258207911ba2b7d91ac56b05ea351282589fe30f4717a707a1b9defaf282afe5ba44200825820791\
1ba2b7d91ac56b05ea351282589fe30f4717a707a1b9defaf282afe5ba44201825820869bcb6f35e6b7912c25e5\
cb33fb9906b097980a83f2b8ef40b51c4ef52eccd402825820efc267ad2c15c34a117535eecc877241ed836eb3e\
643ec90de21ca1b12fd79c20282008202820181148200820283023a000f0f6d1a004944ce820082028201830d3a\
000f0f6d1a00106253820082028201830182811a02409e10811a024138c01a0255e528ff";
}

View file

@ -0,0 +1,7 @@
pub use client::*;
pub use codec::*;
pub use protocol::*;
mod client;
mod codec;
mod protocol;

View file

@ -0,0 +1,22 @@
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
Idle,
Busy,
Done,
}
#[derive(Debug)]
pub enum Message<Tx, Reject> {
SubmitTx(Tx),
AcceptTx,
RejectTx(Reject),
Done,
}
// The bytes of a transaction with an era number.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EraTx(pub u16, pub Vec<u8>);
// Raw reject reason.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RejectReason(pub Vec<u8>);

View file

@ -6,6 +6,7 @@ pub mod blockfetch;
pub mod chainsync;
pub mod handshake;
pub mod localstate;
pub mod localtxsubmission;
pub mod txmonitor;
pub mod txsubmission;