feat: Make the underlying TxBody type generic

* WIP

* Fix compilation on windows machines

* Make TxBody generic

Technically we should be generic over TxBody for arbitrary ouroboros implementations; however, that makes things awkward.  So, we introduce GenericClient and GenericServer, with concrete types that instantiate them to Cardano specific types.  We could have done this with default type arguments, but this pushes the type system to it's limits and it often can't infer the correct type

* More examples tweaks; clippy and fmt

* Remove unneccesary defaults

* Tag 24 is no longer mysterious

It means raw CBOR

* Cargo fmt

One day I'll configure vscode to do this on safe
This commit is contained in:
Pi Lanningham 2023-02-20 09:19:06 -06:00 committed by GitHub
parent c8f08fe94c
commit fc2728639f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 121 additions and 71 deletions

View file

@ -1,14 +1,12 @@
use pallas::network::{
miniprotocols::{
chainsync, handshake, localstate, Point, MAINNET_MAGIC, PROTOCOL_N2C_CHAIN_SYNC,
PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY,
},
multiplexer::{self, bearers::Bearer},
miniprotocols::{chainsync, handshake, localstate, Point, MAINNET_MAGIC},
multiplexer,
};
#[derive(Debug)]
struct LoggingObserver;
#[allow(dead_code)]
fn do_handshake(channel: multiplexer::StdChannel) {
let mut client = handshake::N2CClient::new(channel);
@ -26,6 +24,7 @@ fn do_handshake(channel: multiplexer::StdChannel) {
}
}
#[allow(dead_code)]
fn do_localstate_query(channel: multiplexer::StdChannel) {
let mut client = localstate::ClientV10::new(channel);
client.acquire(None).unwrap();
@ -37,6 +36,7 @@ fn do_localstate_query(channel: multiplexer::StdChannel) {
log::info!("system start result: {:?}", result);
}
#[allow(dead_code)]
fn do_chainsync(channel: multiplexer::StdChannel) {
let known_points = vec![Point::Specific(
43847831u64,
@ -67,14 +67,22 @@ fn main() {
.filter_level(log::LevelFilter::Trace)
.init();
#[cfg(not(target_family = "unix"))]
{
panic!("can't use n2c unix socket on non-unix systems");
}
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
#[cfg(target_family = "unix")]
{
use pallas::network::{
miniprotocols::{
PROTOCOL_N2C_CHAIN_SYNC, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY,
},
multiplexer::bearers::Bearer,
};
let bearer = Bearer::connect_unix("/tmp/node.socket").unwrap();
#[cfg(not(target_family = "unix"))]
panic!("can't use n2c unix socket on non-unix systems");
// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut plexer = multiplexer::StdPlexer::new(bearer);
@ -94,3 +102,4 @@ fn main() {
// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(chainsync);
}
}

View file

@ -3,7 +3,10 @@ use std::marker::PhantomData;
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer};
use super::protocol::{Error, Message, State, TxBody, TxIdAndSize};
use super::{
protocol::{Error, Message, State, TxIdAndSize},
EraTxBody, EraTxId,
};
pub enum Request<TxId> {
TxIds(u16, u16),
@ -11,18 +14,32 @@ pub enum Request<TxId> {
Txs(Vec<TxId>),
}
pub struct Client<H, TxId>(State, ChannelBuffer<H>, PhantomData<TxId>)
/// A generic Ouroboros client for submitting a generic notion of "transactions" to another server
pub struct GenericClient<H, TxId, TxBody>(
State,
ChannelBuffer<H>,
PhantomData<TxId>,
PhantomData<TxBody>,
)
where
H: Channel,
Message<TxId>: Fragment;
Message<TxId, TxBody>: Fragment;
impl<H, TxId> Client<H, TxId>
/// A cardano specific instantiation of the ouroboros protocol
pub type Client<H> = GenericClient<H, EraTxId, EraTxBody>;
impl<H, TxId, TxBody> GenericClient<H, TxId, TxBody>
where
H: Channel,
Message<TxId>: Fragment,
Message<TxId, TxBody>: Fragment,
{
pub fn new(channel: H) -> Self {
Self(State::Init, ChannelBuffer::new(channel), PhantomData {})
Self(
State::Init,
ChannelBuffer::new(channel),
PhantomData {},
PhantomData {},
)
}
pub fn state(&self) -> &State {
@ -54,7 +71,7 @@ where
}
/// As a client in a specific state, am I allowed to send this message?
fn assert_outbound_state(&self, msg: &Message<TxId>) -> Result<(), Error> {
fn assert_outbound_state(&self, msg: &Message<TxId, TxBody>) -> Result<(), Error> {
match (&self.0, msg) {
(State::Init, Message::Init) => Ok(()),
(State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()),
@ -66,7 +83,7 @@ where
}
/// As a client in a specific state, am I allowed to receive this message?
fn assert_inbound_state(&self, msg: &Message<TxId>) -> Result<(), Error> {
fn assert_inbound_state(&self, msg: &Message<TxId, TxBody>) -> Result<(), Error> {
match (&self.0, msg) {
(State::Idle, Message::RequestTxIds(..)) => Ok(()),
(State::Idle, Message::RequestTxs(..)) => Ok(()),
@ -74,7 +91,7 @@ where
}
}
pub fn send_message(&mut self, msg: &Message<TxId>) -> Result<(), Error> {
pub fn send_message(&mut self, msg: &Message<TxId, TxBody>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?;
@ -82,7 +99,7 @@ where
Ok(())
}
pub fn recv_message(&mut self) -> Result<Message<TxId>, Error> {
pub fn recv_message(&mut self) -> Result<Message<TxId, TxBody>, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?;
self.assert_inbound_state(&msg)?;

View file

@ -1,8 +1,8 @@
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use pallas_codec::minicbor::{data::Tag, decode, encode, Decode, Decoder, Encode, Encoder};
use super::{
protocol::{Message, TxIdAndSize},
EraTxId, TxBody,
EraTxBody, EraTxId,
};
impl<TxId: Encode<()>> Encode<()> for TxIdAndSize<TxId> {
@ -31,7 +31,7 @@ impl<'b, TxId: Decode<'b, ()>> Decode<'b, ()> for TxIdAndSize<TxId> {
}
}
impl<TxId: Encode<()>> Encode<()> for Message<TxId> {
impl<TxId: Encode<()>, TxBody: Encode<()>> Encode<()> for Message<TxId, TxBody> {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
@ -71,7 +71,7 @@ impl<TxId: Encode<()>> Encode<()> for Message<TxId> {
e.array(2)?.u16(3)?;
e.begin_array()?;
for tx in txs {
e.bytes(&tx.0)?;
e.encode(tx)?;
}
e.end()?;
Ok(())
@ -84,18 +84,33 @@ impl<TxId: Encode<()>> Encode<()> for Message<TxId> {
}
}
impl<'b> Decode<'b, ()> for TxBody {
impl<'b> Decode<'b, ()> for EraTxBody {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
// TODO: the TxBody encoding here needs to be pinned down and parameterized, the
// same way we did TxId!
d.u16()?; // Era?
d.tag()?; // tag 24?
Ok(TxBody(d.bytes()?.to_vec()))
let era = d.u16()?;
let tag = d.tag()?;
if tag != Tag::Cbor {
return Err(decode::Error::message("Expected encoded CBOR data item"));
}
Ok(EraTxBody(era, d.bytes()?.to_vec()))
}
}
impl<'b, TxId: Decode<'b, ()>> Decode<'b, ()> for Message<TxId> {
impl Encode<()> for EraTxBody {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
e.array(2)?;
e.u16(self.0)?;
e.tag(Tag::Cbor)?;
e.bytes(&self.1)?;
Ok(())
}
}
impl<'b, TxId: Decode<'b, ()>, TxBody: Decode<'b, ()>> Decode<'b, ()> for Message<TxId, TxBody> {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let label = d.u16()?;

View file

@ -21,21 +21,13 @@ pub type TxSizeInBytes = u32;
#[derive(Debug, Clone)]
pub struct EraTxId(pub u16, pub Vec<u8>);
// The bytes of a transaction, with an era number and some raw CBOR
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EraTxBody(pub u16, pub Vec<u8>);
#[derive(Debug)]
pub struct TxIdAndSize<TxID>(pub TxID, pub TxSizeInBytes);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TxBody(pub Vec<u8>);
#[derive(Debug, Clone)]
pub struct Tx<TxId>(pub TxId, pub TxBody);
impl<TxId> From<Tx<TxId>> for TxIdAndSize<TxId> {
fn from(other: Tx<TxId>) -> Self {
TxIdAndSize(other.0, other.1 .0.len() as u32)
}
}
#[derive(Error, Debug)]
pub enum Error {
#[error("attempted to receive message while agency is ours")]
@ -58,7 +50,7 @@ pub enum Error {
}
#[derive(Debug)]
pub enum Message<TxId> {
pub enum Message<TxId, TxBody> {
Init,
RequestTxIds(Blocking, TxCount, TxCount),
ReplyTxIds(Vec<TxIdAndSize<TxId>>),

View file

@ -3,26 +3,43 @@ use std::marker::PhantomData;
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer};
use super::protocol::{Blocking, Error, Message, State, TxBody, TxCount, TxIdAndSize};
use super::{
protocol::{Blocking, Error, Message, State, TxCount, TxIdAndSize},
EraTxBody, EraTxId,
};
pub enum Reply<TxId> {
pub enum Reply<TxId, TxBody> {
TxIds(Vec<TxIdAndSize<TxId>>),
Txs(Vec<TxBody>),
Done,
}
pub struct Server<H, TxId>(State, ChannelBuffer<H>, PhantomData<TxId>)
/// A generic implementation of an ouroboros server protocol ready to request and receive transactions from a client
pub struct GenericServer<H, TxId, TxBody>(
State,
ChannelBuffer<H>,
PhantomData<TxId>,
PhantomData<TxBody>,
)
where
H: Channel,
Message<TxId>: Fragment;
Message<TxId, TxBody>: Fragment;
impl<H, TxId> Server<H, TxId>
/// A Cardano specific server for the ouroboros TxSubmission protocol
pub type Server<H> = GenericServer<H, EraTxId, EraTxBody>;
impl<H, TxId, TxBody> GenericServer<H, TxId, TxBody>
where
H: Channel,
Message<TxId>: Fragment,
Message<TxId, TxBody>: Fragment,
{
pub fn new(channel: H) -> Self {
Self(State::Init, ChannelBuffer::new(channel), PhantomData {})
Self(
State::Init,
ChannelBuffer::new(channel),
PhantomData {},
PhantomData {},
)
}
pub fn state(&self) -> &State {
@ -54,7 +71,7 @@ where
}
/// As a server in a specific state, am I allowed to send this message?
fn assert_outbound_state(&self, msg: &Message<TxId>) -> Result<(), Error> {
fn assert_outbound_state(&self, msg: &Message<TxId, TxBody>) -> Result<(), Error> {
match (&self.0, msg) {
(State::Idle, Message::RequestTxIds(..)) => Ok(()),
(State::Idle, Message::RequestTxs(..)) => Ok(()),
@ -63,7 +80,7 @@ where
}
/// As a server in a specific state, am I allowed to receive this message?
fn assert_inbound_state(&self, msg: &Message<TxId>) -> Result<(), Error> {
fn assert_inbound_state(&self, msg: &Message<TxId, TxBody>) -> Result<(), Error> {
match (&self.0, msg) {
(State::Init, Message::Init) => Ok(()),
(State::TxIdsBlocking, Message::ReplyTxIds(..)) => Ok(()),
@ -74,7 +91,7 @@ where
}
}
pub fn send_message(&mut self, msg: &Message<TxId>) -> Result<(), Error> {
pub fn send_message(&mut self, msg: &Message<TxId, TxBody>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?;
@ -82,7 +99,7 @@ where
Ok(())
}
pub fn recv_message(&mut self) -> Result<Message<TxId>, Error> {
pub fn recv_message(&mut self) -> Result<Message<TxId, TxBody>, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?;
self.assert_inbound_state(&msg)?;
@ -126,7 +143,7 @@ where
Ok(())
}
pub fn receive_next_reply(&mut self) -> Result<Reply<TxId>, Error> {
pub fn receive_next_reply(&mut self) -> Result<Reply<TxId, TxBody>, Error> {
match self.recv_message()? {
Message::ReplyTxIds(ids_and_sizes) => {
self.0 = State::Idle;

View file

@ -2,7 +2,7 @@ use pallas_miniprotocols::{
blockfetch,
chainsync::{self, NextResponse},
handshake::{self, Confirmation},
txsubmission::{self, EraTxId, Reply, TxIdAndSize},
txsubmission::{self, EraTxBody, EraTxId, Reply, Server, TxIdAndSize},
Point, PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE,
PROTOCOL_N2N_TX_SUBMISSION,
};
@ -187,7 +187,7 @@ pub fn txsubmission_server_happy_path() {
Ok(_)
));
let reply = server.receive_next_reply();
let reply: Result<_, _> = server.receive_next_reply();
assert!(matches!(reply, Ok(Reply::TxIds(_))));
let Ok(Reply::TxIds(tx_ids)) = reply else { unreachable!() };