From 187245a62f071d92de26d55f01df881fa993a97b Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 13 Mar 2022 09:37:08 -0300 Subject: [PATCH] feat: Introduce shared codec lib (#71) closes #65 --- Cargo.toml | 1 + examples/byron-decoder/src/main.rs | 14 +- pallas-codec/Cargo.toml | 17 ++ pallas-codec/README.md | 2 + pallas-codec/src/lib.rs | 30 +++ .../src/utils.rs | 1 - pallas-miniprotocols/Cargo.toml | 4 +- .../examples/chainsync-blocks.rs | 25 +-- pallas-miniprotocols/src/blockfetch/mod.rs | 34 ++-- pallas-miniprotocols/src/chainsync/clients.rs | 28 +-- pallas-miniprotocols/src/chainsync/codec.rs | 107 +++++------ pallas-miniprotocols/src/codec.rs | 29 --- pallas-miniprotocols/src/common.rs | 31 ++++ pallas-miniprotocols/src/handshake/common.rs | 25 ++- pallas-miniprotocols/src/handshake/n2c.rs | 39 ++-- pallas-miniprotocols/src/handshake/n2n.rs | 43 +++-- pallas-miniprotocols/src/lib.rs | 2 - pallas-miniprotocols/src/localstate/codec.rs | 63 ++++--- pallas-miniprotocols/src/localstate/mod.rs | 28 +-- .../src/localstate/queries.rs | 37 ++-- pallas-miniprotocols/src/machines.rs | 37 ++-- pallas-miniprotocols/src/payloads.rs | 173 +++--------------- pallas-miniprotocols/src/txsubmission/mod.rs | 35 ++-- pallas-primitives/Cargo.toml | 3 +- pallas-primitives/src/alonzo/model.rs | 12 +- pallas-primitives/src/byron/address.rs | 5 +- pallas-primitives/src/byron/fees.rs | 2 +- pallas-primitives/src/byron/model.rs | 10 +- pallas-primitives/src/framework.rs | 8 +- pallas-primitives/src/lib.rs | 1 - pallas-primitives/src/probing.rs | 2 +- pallas/Cargo.toml | 1 + pallas/src/lib.rs | 3 + 33 files changed, 402 insertions(+), 450 deletions(-) create mode 100644 pallas-codec/Cargo.toml create mode 100644 pallas-codec/README.md create mode 100644 pallas-codec/src/lib.rs rename {pallas-primitives => pallas-codec}/src/utils.rs (99%) delete mode 100644 pallas-miniprotocols/src/codec.rs diff --git a/Cargo.toml b/Cargo.toml index 57e4eef..4d8f706 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ + "pallas-codec", "pallas-multiplexer", "pallas-miniprotocols", "pallas-crypto", diff --git a/examples/byron-decoder/src/main.rs b/examples/byron-decoder/src/main.rs index d410a4a..e9b7412 100644 --- a/examples/byron-decoder/src/main.rs +++ b/examples/byron-decoder/src/main.rs @@ -6,7 +6,7 @@ use pallas::{ miniprotocols::{ blockfetch::{BatchClient, Observer}, handshake::n2n::{Client, VersionTable}, - run_agent, Point, MAINNET_MAGIC, + run_agent, Point, TESTNET_MAGIC, }, multiplexer::Multiplexer, }, @@ -33,25 +33,25 @@ impl Observer for BlockPrinter { fn main() { env_logger::init(); - let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + let bearer = TcpStream::connect("relays-new.cardano-testnet.iohkdev.io:3001").unwrap(); bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); let mut muxer = Multiplexer::setup(bearer, &[0, 3]).unwrap(); let mut hs_channel = muxer.use_channel(0); - let versions = VersionTable::v4_and_above(MAINNET_MAGIC); + let versions = VersionTable::v4_and_above(TESTNET_MAGIC); let _last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); let range = ( Point::Specific( - 3240000, - hex::decode("b7096a881f77ced24bdd285758646c0e059545b54855bd3a2307ece518bd6317") + 23470073, + hex::decode("333b55ab6e013b8e4fdf19d05dbf33aa0d58a59a2b1b86d0c75f58ff76a9e565") .unwrap(), ), Point::Specific( - 4492794, - hex::decode("5c196e7394ace0449ba5a51c919369699b13896e97432894b4f0354dce8670b6") + 51278306, + hex::decode("936a8e8387d68e8497216d4cee8ec3810bae3902aba5c7b8ab911ad36984d6ad") .unwrap(), ), ); diff --git a/pallas-codec/Cargo.toml b/pallas-codec/Cargo.toml new file mode 100644 index 0000000..9fc4254 --- /dev/null +++ b/pallas-codec/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "pallas-codec" +description = "Pallas common CBOR encoding interface and utilities" +version = "0.7.0-alpha.0" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +homepage = "https://github.com/txpipe/pallas" +documentation = "https://docs.rs/pallas-codec" +license = "Apache-2.0" +readme = "README.md" +authors = [ + "Santiago Carmuega " +] + +[dependencies] +minicbor = { version = "0.14", features = ["std", "half", "derive"] } + diff --git a/pallas-codec/README.md b/pallas-codec/README.md new file mode 100644 index 0000000..6f6cb36 --- /dev/null +++ b/pallas-codec/README.md @@ -0,0 +1,2 @@ +# Pallas Codec + diff --git a/pallas-codec/src/lib.rs b/pallas-codec/src/lib.rs new file mode 100644 index 0000000..c538542 --- /dev/null +++ b/pallas-codec/src/lib.rs @@ -0,0 +1,30 @@ +use minicbor::encode::Write; + +/// Shared re-export of minicbor lib across all Pallas +pub use minicbor; + +/// Round-trip friendly common helper structs +pub mod utils; + +pub trait Fragment: Sized { + fn read_cbor(buffer: &[u8]) -> Result; + fn write_cbor(&self, write: W) -> Result<(), minicbor::encode::Error>; +} + +#[macro_export] +macro_rules! impl_fragment { + ($Struct:ty) => { + impl $crate::Fragment for $Struct { + fn read_cbor(buffer: &[u8]) -> Result { + $crate::minicbor::decode(buffer) + } + + fn write_cbor( + &self, + write: W, + ) -> Result<(), encode::Error> { + $crate::minicbor::encode(self, write) + } + } + }; +} diff --git a/pallas-primitives/src/utils.rs b/pallas-codec/src/utils.rs similarity index 99% rename from pallas-primitives/src/utils.rs rename to pallas-codec/src/utils.rs index 7b57c6d..fcfe70d 100644 --- a/pallas-primitives/src/utils.rs +++ b/pallas-codec/src/utils.rs @@ -10,7 +10,6 @@ impl<'b, const N: usize> minicbor::Decode<'b> for SkipCbor { fn decode(d: &mut minicbor::Decoder<'b>) -> Result { { let probe = d.probe(); - log::warn!("skipped cbor value {}: {:?}", N, probe.datatype()?); println!("skipped cbor value {}: {:?}", N, probe.datatype()?); } diff --git a/pallas-miniprotocols/Cargo.toml b/pallas-miniprotocols/Cargo.toml index 178e97f..e196c4c 100644 --- a/pallas-miniprotocols/Cargo.toml +++ b/pallas-miniprotocols/Cargo.toml @@ -14,7 +14,7 @@ authors = [ [dependencies] pallas-multiplexer = { version = "0.7.0-alpha.0", path = "../pallas-multiplexer/" } -minicbor = { version="0.14", features=["half", "std"] } +pallas-codec = { version = "0.7.0-alpha.0", path = "../pallas-codec/" } log = "0.4.14" hex = "0.4.3" itertools = "0.10.3" @@ -22,4 +22,4 @@ net2 = "0.2.37" [dev-dependencies] env_logger = "0.9.0" -pallas-primitives = { version = "0.6.0", path = "../pallas-primitives/" } +pallas-primitives = { version = "0.7.0-alpha.0", path = "../pallas-primitives/" } diff --git a/pallas-miniprotocols/examples/chainsync-blocks.rs b/pallas-miniprotocols/examples/chainsync-blocks.rs index 79ec5a5..4d3f3ec 100644 --- a/pallas-miniprotocols/examples/chainsync-blocks.rs +++ b/pallas-miniprotocols/examples/chainsync-blocks.rs @@ -1,30 +1,9 @@ -use pallas_primitives::alonzo::{Block, BlockWrapper}; -use pallas_primitives::Fragment; - -use pallas_miniprotocols::chainsync::{Consumer, NoopObserver}; +use pallas_miniprotocols::chainsync::{BlockContent, Consumer, NoopObserver}; use pallas_miniprotocols::handshake::n2c::{Client, VersionTable}; use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC}; -use pallas_miniprotocols::{DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder}; use pallas_multiplexer::Multiplexer; use std::os::unix::net::UnixStream; -#[derive(Debug)] -pub struct Content(Block); - -impl EncodePayload for Content { - fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box> { - todo!() - } -} - -impl DecodePayload for Content { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - d.tag()?; - let bytes = d.bytes()?; - let BlockWrapper(_, block) = BlockWrapper::decode_fragment(bytes)?; - Ok(Content(block)) - } -} fn main() { env_logger::init(); @@ -46,7 +25,7 @@ fn main() { )]; let mut cs_channel = muxer.use_channel(5); - let cs = Consumer::::initial(Some(known_points), NoopObserver {}); + let cs = Consumer::::initial(Some(known_points), NoopObserver {}); let cs = run_agent(cs, &mut cs_channel).unwrap(); println!("{:?}", cs); } diff --git a/pallas-miniprotocols/src/blockfetch/mod.rs b/pallas-miniprotocols/src/blockfetch/mod.rs index bb2d7d6..e35c984 100644 --- a/pallas-miniprotocols/src/blockfetch/mod.rs +++ b/pallas-miniprotocols/src/blockfetch/mod.rs @@ -1,13 +1,11 @@ use std::sync::mpsc::Receiver; -use crate::machines::{ - Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder, - Transition, -}; +use crate::machines::{Agent, MachineOutput, Transition}; use crate::common::Point; -use log::debug; +use pallas_codec::impl_fragment; +use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; #[derive(Debug, PartialEq, Clone)] pub enum State { @@ -27,13 +25,13 @@ pub enum Message { BatchDone, } -impl EncodePayload for Message { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for Message { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { match self { Message::RequestRange { range } => { e.array(3)?.u16(0)?; - range.0.encode_payload(e)?; - range.1.encode_payload(e)?; + range.0.encode(e)?; + range.1.encode(e)?; Ok(()) } Message::ClientDone => { @@ -61,15 +59,15 @@ impl EncodePayload for Message { } } -impl DecodePayload for Message { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for Message { + fn decode(d: &mut Decoder<'b>) -> Result { d.array()?; let label = d.u16()?; match label { 0 => { - let point1 = Point::decode_payload(d)?; - let point2 = Point::decode_payload(d)?; + let point1 = Point::decode(d)?; + let point2 = Point::decode(d)?; Ok(Message::RequestRange { range: (point1, point2), }) @@ -85,11 +83,15 @@ impl DecodePayload for Message { }) } 5 => Ok(Message::BatchDone), - x => Err(Box::new(CodecError::BadLabel(x))), + _ => Err(decode::Error::message( + "unknown variant for blockfetch message", + )), } } } +impl_fragment!(Message); + pub trait Observer { fn on_block_received(&self, body: Vec) -> Result<(), Box> { log::debug!("block received, sice: {}", body.len()); @@ -152,7 +154,7 @@ where } fn on_block(self, body: Vec) -> Transition { - debug!("received block body, size {}", body.len()); + log::debug!("received block body, size {}", body.len()); self.observer.on_block_received(body)?; @@ -244,7 +246,7 @@ where } fn on_block(self, body: Vec) -> Transition { - debug!("received block body, size {}", body.len()); + log::debug!("received block body, size {}", body.len()); self.observer.on_block_received(body)?; diff --git a/pallas-miniprotocols/src/chainsync/clients.rs b/pallas-miniprotocols/src/chainsync/clients.rs index 602239b..a17b74c 100644 --- a/pallas-miniprotocols/src/chainsync/clients.rs +++ b/pallas-miniprotocols/src/chainsync/clients.rs @@ -1,10 +1,9 @@ use std::fmt::Debug; use std::marker::PhantomData; -use log::debug; +use pallas_codec::Fragment; use crate::machines::{Agent, MachineError, MachineOutput, Transition}; -use crate::{DecodePayload, EncodePayload}; use crate::common::Point; @@ -64,7 +63,7 @@ where impl Consumer where O: Observer, - C: DecodePayload + EncodePayload, + Message: Fragment, { pub fn initial(known_points: Option>, observer: O) -> Self { Self { @@ -78,7 +77,7 @@ where } fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { - debug!("requesting find intersect"); + log::debug!("requesting find intersect"); let points = match &self.known_points { Some(x) => x.clone(), @@ -96,7 +95,7 @@ where } fn send_request_next(self, tx: &impl MachineOutput) -> Transition { - debug!("requesting next"); + log::debug!("requesting next"); let msg = Message::::RequestNext; @@ -109,7 +108,7 @@ where } fn on_intersect_found(mut self, point: Point, tip: Tip) -> Transition { - debug!("intersect found: {:?} (tip: {:?})", point, tip); + log::debug!("intersect found: {:?} (tip: {:?})", point, tip); self.observer.on_intersect_found(&point, &tip)?; @@ -122,7 +121,7 @@ where } fn on_intersect_not_found(self, tip: Tip) -> Transition { - debug!("intersect not found (tip: {:?})", tip); + log::debug!("intersect not found (tip: {:?})", tip); Ok(Self { tip: Some(tip), @@ -133,7 +132,7 @@ where } fn on_roll_forward(mut self, content: C, tip: Tip) -> Transition { - debug!("rolling forward"); + log::debug!("rolling forward"); self.observer.on_roll_forward(content, &tip)?; @@ -145,9 +144,9 @@ where } fn on_roll_backward(mut self, point: Point, tip: Tip) -> Transition { - debug!("rolling backward to point: {:?}", point); + log::debug!("rolling backward to point: {:?}", point); - debug!("reporting rollback to observer"); + log::debug!("reporting rollback to observer"); self.observer.on_rollback(&point)?; Ok(Self { @@ -159,7 +158,7 @@ where } fn on_await_reply(mut self) -> Transition { - debug!("reached tip, await reply"); + log::debug!("reached tip, await reply"); self.observer.on_tip_reached()?; @@ -172,8 +171,9 @@ where impl Agent for Consumer where - C: EncodePayload + DecodePayload + Debug + 'static, O: Observer, + C: Debug + 'static, + Message: Fragment, { type Message = Message; @@ -259,7 +259,7 @@ impl TipFinder { } fn on_intersect_found(self, tip: Tip) -> Transition { - debug!("intersect found with tip: {:?}", tip); + log::debug!("intersect found with tip: {:?}", tip); Ok(Self { state: State::Done, @@ -269,7 +269,7 @@ impl TipFinder { } fn on_intersect_not_found(self, tip: Tip) -> Transition { - debug!("intersect not found but still have a tip: {:?}", tip); + log::debug!("intersect not found but still have a tip: {:?}", tip); Ok(Self { state: State::Done, diff --git a/pallas-miniprotocols/src/chainsync/codec.rs b/pallas-miniprotocols/src/chainsync/codec.rs index 5770156..5a7923a 100644 --- a/pallas-miniprotocols/src/chainsync/codec.rs +++ b/pallas-miniprotocols/src/chainsync/codec.rs @@ -1,33 +1,36 @@ use crate::common::Point; -use crate::machines::{CodecError, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder}; +use pallas_codec::{ + impl_fragment, + minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}, +}; use super::{BlockContent, HeaderContent, Message, SkippedContent, Tip}; -impl EncodePayload for Tip { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for Tip { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { e.array(2)?; - self.0.encode_payload(e)?; + self.0.encode(e)?; e.u64(self.1)?; Ok(()) } } -impl DecodePayload for Tip { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for Tip { + fn decode(d: &mut Decoder<'b>) -> Result { d.array()?; - let point = Point::decode_payload(d)?; + let point = Point::decode(d)?; let block_num = d.u64()?; Ok(Tip(point, block_num)) } } -impl EncodePayload for Message +impl Encode for Message where - C: EncodePayload, + C: Encode, { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { match self { Message::RequestNext => { e.array(1)?.u16(0)?; @@ -39,33 +42,33 @@ where } Message::RollForward(content, tip) => { e.array(3)?.u16(2)?; - content.encode_payload(e)?; - tip.encode_payload(e)?; + content.encode(e)?; + tip.encode(e)?; Ok(()) } Message::RollBackward(point, tip) => { e.array(3)?.u16(3)?; - point.encode_payload(e)?; - tip.encode_payload(e)?; + point.encode(e)?; + tip.encode(e)?; Ok(()) } Message::FindIntersect(points) => { e.array(2)?.u16(4)?; e.array(points.len() as u64)?; for point in points.iter() { - point.encode_payload(e)?; + point.encode(e)?; } Ok(()) } Message::IntersectFound(point, tip) => { e.array(3)?.u16(5)?; - point.encode_payload(e)?; - tip.encode_payload(e)?; + point.encode(e)?; + tip.encode(e)?; Ok(()) } Message::IntersectNotFound(tip) => { e.array(1)?.u16(6)?; - tip.encode_payload(e)?; + tip.encode(e)?; Ok(()) } Message::Done => { @@ -76,11 +79,11 @@ where } } -impl DecodePayload for Message +impl<'b, C> Decode<'b> for Message where - C: DecodePayload, + C: Decode<'b>, { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { + fn decode(d: &mut Decoder<'b>) -> Result { d.array()?; let label = d.u16()?; @@ -88,36 +91,38 @@ where 0 => Ok(Message::RequestNext), 1 => Ok(Message::AwaitReply), 2 => { - let content = C::decode_payload(d)?; - let tip = Tip::decode_payload(d)?; + let content = C::decode(d)?; + let tip = Tip::decode(d)?; Ok(Message::RollForward(content, tip)) } 3 => { - let point = Point::decode_payload(d)?; - let tip = Tip::decode_payload(d)?; + let point = Point::decode(d)?; + let tip = Tip::decode(d)?; Ok(Message::RollBackward(point, tip)) } 4 => { - let points = Vec::::decode_payload(d)?; + let points = Vec::::decode(d)?; Ok(Message::FindIntersect(points)) } 5 => { - let point = Point::decode_payload(d)?; - let tip = Tip::decode_payload(d)?; + let point = Point::decode(d)?; + let tip = Tip::decode(d)?; Ok(Message::IntersectFound(point, tip)) } 6 => { - let tip = Tip::decode_payload(d)?; + let tip = Tip::decode(d)?; Ok(Message::IntersectNotFound(tip)) } 7 => Ok(Message::Done), - x => Err(Box::new(CodecError::BadLabel(x))), + _ => Err(decode::Error::message( + "unknown variant for chainsync message", + )), } } } -impl DecodePayload for HeaderContent { - fn decode_payload(d: &mut crate::PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for HeaderContent { + fn decode(d: &mut Decoder<'b>) -> Result { d.array()?; let variant = d.u8()?; // era variant @@ -154,43 +159,41 @@ impl DecodePayload for HeaderContent { } } -impl EncodePayload for HeaderContent { - fn encode_payload( - &self, - _e: &mut crate::PayloadEncoder, - ) -> Result<(), Box> { +impl Encode for HeaderContent { + fn encode(&self, _e: &mut Encoder) -> Result<(), encode::Error> { todo!() } } -impl DecodePayload for BlockContent { - fn decode_payload(d: &mut crate::PayloadDecoder) -> Result> { +impl_fragment!(Message); + +impl<'b> Decode<'b> for BlockContent { + fn decode(d: &mut Decoder<'b>) -> Result { d.tag()?; let bytes = d.bytes()?; Ok(BlockContent(Vec::from(bytes))) } } -impl EncodePayload for BlockContent { - fn encode_payload( - &self, - _e: &mut crate::PayloadEncoder, - ) -> Result<(), Box> { +impl Encode for BlockContent { + fn encode(&self, _e: &mut Encoder) -> Result<(), encode::Error> { todo!() } } -impl DecodePayload for SkippedContent { - fn decode_payload(d: &mut crate::PayloadDecoder) -> Result> { + +impl_fragment!(Message); + +impl<'b> Decode<'b> for SkippedContent { + fn decode(d: &mut Decoder<'b>) -> Result { d.skip()?; Ok(SkippedContent) } } -impl EncodePayload for SkippedContent { - fn encode_payload( - &self, - _e: &mut crate::PayloadEncoder, - ) -> Result<(), Box> { - Ok(()) +impl Encode for SkippedContent { + fn encode(&self, _e: &mut Encoder) -> Result<(), encode::Error> { + todo!() } } + +impl_fragment!(Message); diff --git a/pallas-miniprotocols/src/codec.rs b/pallas-miniprotocols/src/codec.rs deleted file mode 100644 index 3c15347..0000000 --- a/pallas-miniprotocols/src/codec.rs +++ /dev/null @@ -1,29 +0,0 @@ -use super::common::*; -use super::payloads::*; - -impl EncodePayload for Point { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { - match self { - Point::Origin => e.array(0)?, - Point::Specific(slot, hash) => e.array(2)?.u64(*slot)?.bytes(hash)?, - }; - - Ok(()) - } -} - -impl DecodePayload for Point { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - let size = d.array()?; - - match size { - Some(0) => Ok(Point::Origin), - Some(2) => { - let slot = d.u64()?; - let hash = d.bytes()?; - Ok(Point::Specific(slot, Vec::from(hash))) - } - _ => Err("can't decode Point from array of size".into()), - } - } -} diff --git a/pallas-miniprotocols/src/common.rs b/pallas-miniprotocols/src/common.rs index 39c5964..3b05467 100644 --- a/pallas-miniprotocols/src/common.rs +++ b/pallas-miniprotocols/src/common.rs @@ -1,5 +1,7 @@ use std::fmt::Debug; +use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; + /// Well-known magic for testnet pub const TESTNET_MAGIC: u64 = 1097911063; @@ -36,3 +38,32 @@ impl Point { Point::Specific(slot, hash) } } + +impl Encode for Point { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { + match self { + Point::Origin => e.array(0)?, + Point::Specific(slot, hash) => e.array(2)?.u64(*slot)?.bytes(hash)?, + }; + + Ok(()) + } +} + +impl<'b> Decode<'b> for Point { + fn decode(d: &mut Decoder<'b>) -> Result { + let size = d.array()?; + + match size { + Some(0) => Ok(Point::Origin), + Some(2) => { + let slot = d.u64()?; + let hash = d.bytes()?; + Ok(Point::Specific(slot, Vec::from(hash))) + } + _ => Err(decode::Error::message( + "can't decode Point from array of size", + )), + } + } +} diff --git a/pallas-miniprotocols/src/handshake/common.rs b/pallas-miniprotocols/src/handshake/common.rs index 489092d..af75921 100644 --- a/pallas-miniprotocols/src/handshake/common.rs +++ b/pallas-miniprotocols/src/handshake/common.rs @@ -1,28 +1,25 @@ -use crate::{ - machines::{CodecError, DecodePayload, EncodePayload, PayloadEncoder}, - payloads::PayloadDecoder, -}; use itertools::Itertools; +use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; use std::{collections::HashMap, fmt::Debug}; #[derive(Debug, Clone)] pub struct VersionTable where - T: Debug + Clone + EncodePayload + DecodePayload, + T: Debug + Clone, { pub values: HashMap, } -impl EncodePayload for VersionTable +impl<'b, T> Encode for VersionTable where - T: Debug + Clone + EncodePayload + DecodePayload, + T: Debug + Clone + Encode + Decode<'b>, { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { e.map(self.values.len() as u64)?; for key in self.values.keys().sorted() { e.u64(*key)?; - self.values[key].encode_payload(e)?; + self.values[key].encode(e)?; } Ok(()) @@ -40,8 +37,8 @@ pub enum RefuseReason { Refused(VersionNumber, String), } -impl EncodePayload for RefuseReason { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for RefuseReason { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { match self { RefuseReason::VersionMismatch(versions) => { e.array(2)?; @@ -73,8 +70,8 @@ impl EncodePayload for RefuseReason { } } -impl DecodePayload for RefuseReason { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for RefuseReason { + fn decode(d: &mut Decoder<'b>) -> Result { d.array()?; match d.u16()? { @@ -95,7 +92,7 @@ impl DecodePayload for RefuseReason { Ok(RefuseReason::Refused(version, msg.to_string())) } - x => Err(Box::new(CodecError::BadLabel(x))), + _ => Err(decode::Error::message("unknown variant for refusereason")), } } } diff --git a/pallas-miniprotocols/src/handshake/n2c.rs b/pallas-miniprotocols/src/handshake/n2c.rs index 66679db..2f001b5 100644 --- a/pallas-miniprotocols/src/handshake/n2c.rs +++ b/pallas-miniprotocols/src/handshake/n2c.rs @@ -1,10 +1,13 @@ use core::panic; use std::collections::HashMap; -use crate::machines::{ - Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder, +use pallas_codec::{ + impl_fragment, + minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}, }; +use crate::machines::{Agent, MachineOutput}; + use super::common::{NetworkMagic, RefuseReason, VersionNumber}; pub type VersionTable = super::common::VersionTable; @@ -52,16 +55,16 @@ impl VersionTable { #[derive(Debug, Clone)] pub struct VersionData(NetworkMagic); -impl EncodePayload for VersionData { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for VersionData { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { e.u64(self.0)?; Ok(()) } } -impl DecodePayload for VersionData { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for VersionData { + fn decode(d: &mut Decoder<'b>) -> Result { let network_magic = d.u64()?; Ok(Self(network_magic)) @@ -75,21 +78,21 @@ pub enum Message { Refuse(RefuseReason), } -impl EncodePayload for Message { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for Message { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { match self { Message::Propose(version_table) => { e.array(2)?.u16(0)?; - version_table.encode_payload(e)?; + version_table.encode(e)?; } Message::Accept(version_number, version_data) => { e.array(3)?.u16(1)?; e.u64(*version_number)?; - version_data.encode_payload(e)?; + version_data.encode(e)?; } Message::Refuse(reason) => { e.array(2)?.u16(2)?; - reason.encode_payload(e)?; + reason.encode(e)?; } }; @@ -97,26 +100,30 @@ impl EncodePayload for Message { } } -impl DecodePayload for Message { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for Message { + fn decode(d: &mut Decoder<'b>) -> Result { d.array()?; match d.u16()? { 0 => todo!(), 1 => { let version_number = d.u64()?; - let version_data = VersionData::decode_payload(d)?; + let version_data = VersionData::decode(d)?; Ok(Message::Accept(version_number, version_data)) } 2 => { - let reason = RefuseReason::decode_payload(d)?; + let reason = RefuseReason::decode(d)?; Ok(Message::Refuse(reason)) } - x => Err(Box::new(CodecError::BadLabel(x))), + _ => Err(decode::Error::message( + "unkown variant for handshake message", + )), } } } +impl_fragment!(Message); + #[derive(Debug, PartialEq, Eq)] pub enum State { Propose, diff --git a/pallas-miniprotocols/src/handshake/n2n.rs b/pallas-miniprotocols/src/handshake/n2n.rs index 2402ce5..1820abb 100644 --- a/pallas-miniprotocols/src/handshake/n2n.rs +++ b/pallas-miniprotocols/src/handshake/n2n.rs @@ -1,10 +1,13 @@ use core::panic; use std::collections::HashMap; -use crate::machines::{ - Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder, +use pallas_codec::{ + impl_fragment, + minicbor::{decode, encode, Decode, Encode, Encoder}, }; +use crate::machines::{Agent, MachineOutput}; + use super::common::{RefuseReason, VersionNumber}; pub type VersionTable = super::common::VersionTable; @@ -55,8 +58,8 @@ impl VersionData { } } -impl EncodePayload for VersionData { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for VersionData { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { e.array(2)? .u64(self.network_magic)? .bool(self.initiator_and_responder_diffusion_mode)?; @@ -65,8 +68,10 @@ impl EncodePayload for VersionData { } } -impl DecodePayload for VersionData { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for VersionData { + fn decode( + d: &mut pallas_codec::minicbor::Decoder<'b>, + ) -> Result { d.array()?; let network_magic = d.u64()?; let initiator_and_responder_diffusion_mode = d.bool()?; @@ -85,21 +90,21 @@ pub enum Message { Refuse(RefuseReason), } -impl EncodePayload for Message { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for Message { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { match self { Message::Propose(version_table) => { e.array(2)?.u16(0)?; - version_table.encode_payload(e)?; + version_table.encode(e)?; } Message::Accept(version_number, version_data) => { e.array(3)?.u16(1)?; e.u64(*version_number)?; - version_data.encode_payload(e)?; + version_data.encode(e)?; } Message::Refuse(reason) => { e.array(2)?.u16(2)?; - reason.encode_payload(e)?; + reason.encode(e)?; } }; @@ -107,26 +112,32 @@ impl EncodePayload for Message { } } -impl DecodePayload for Message { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for Message { + fn decode( + d: &mut pallas_codec::minicbor::Decoder<'b>, + ) -> Result { d.array()?; match d.u16()? { 0 => todo!(), 1 => { let version_number = d.u64()?; - let version_data = VersionData::decode_payload(d)?; + let version_data = VersionData::decode(d)?; Ok(Message::Accept(version_number, version_data)) } 2 => { - let reason = RefuseReason::decode_payload(d)?; + let reason = RefuseReason::decode(d)?; Ok(Message::Refuse(reason)) } - x => Err(Box::new(CodecError::BadLabel(x))), + _ => Err(decode::Error::message( + "unknown variant for handshake message", + )), } } } +impl_fragment!(Message); + #[derive(Debug, PartialEq, Eq)] pub enum State { Propose, diff --git a/pallas-miniprotocols/src/lib.rs b/pallas-miniprotocols/src/lib.rs index ae42887..00ea91f 100644 --- a/pallas-miniprotocols/src/lib.rs +++ b/pallas-miniprotocols/src/lib.rs @@ -1,4 +1,3 @@ -mod codec; mod common; mod machines; mod payloads; @@ -9,7 +8,6 @@ pub mod handshake; pub mod localstate; pub mod txsubmission; -pub use codec::*; pub use common::*; pub use machines::*; pub use payloads::*; diff --git a/pallas-miniprotocols/src/localstate/codec.rs b/pallas-miniprotocols/src/localstate/codec.rs index 287c3e3..543d9c7 100644 --- a/pallas-miniprotocols/src/localstate/codec.rs +++ b/pallas-miniprotocols/src/localstate/codec.rs @@ -1,8 +1,9 @@ -use super::*; -use crate::machines::*; +use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder}; -impl EncodePayload for AcquireFailure { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +use super::{AcquireFailure, Message, Query}; + +impl Encode for AcquireFailure { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { let code = match self { AcquireFailure::PointTooOld => 0, AcquireFailure::PointNotInChain => 1, @@ -14,26 +15,33 @@ impl EncodePayload for AcquireFailure { } } -impl DecodePayload for AcquireFailure { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for AcquireFailure { + fn decode( + d: &mut pallas_codec::minicbor::Decoder<'b>, + ) -> Result { let code = d.u16()?; match code { 0 => Ok(AcquireFailure::PointTooOld), 1 => Ok(AcquireFailure::PointNotInChain), - _ => Err(Box::new(CodecError::UnexpectedCbor( + _ => Err(decode::Error::message( "can't infer acquire failure from variant id", - ))), + )), } } } -impl EncodePayload for Message { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for Message +where + Q: Query, + Q::Request: Encode, + Q::Response: Encode, +{ + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { match self { Message::Acquire(Some(point)) => { e.array(2)?.u16(0)?; - e.encode_payload(point)?; + e.encode(point)?; Ok(()) } Message::Acquire(None) => { @@ -46,24 +54,24 @@ impl EncodePayload for Message { } Message::Failure(failure) => { e.array(2)?.u16(2)?; - e.encode_payload(failure)?; + e.encode(failure)?; Ok(()) } Message::Query(query) => { e.array(2)?.u16(3)?; e.array(1)?; - e.encode_payload(query)?; + e.encode(query)?; Ok(()) } Message::Result(result) => { e.array(2)?.u16(4)?; e.array(1)?; - e.encode_payload(result)?; + e.encode(result)?; Ok(()) } Message::ReAcquire(Some(point)) => { e.array(2)?.u16(6)?; - e.encode_payload(point)?; + e.encode(point)?; Ok(()) } Message::ReAcquire(None) => { @@ -82,38 +90,47 @@ impl EncodePayload for Message { } } -impl DecodePayload for Message { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b, Q> Decode<'b> for Message +where + Q: Query, + Q::Request: Decode<'b>, + Q::Response: Decode<'b>, +{ + fn decode( + d: &mut pallas_codec::minicbor::Decoder<'b>, + ) -> Result { d.array()?; let label = d.u16()?; match label { 0 => { - let point = d.decode_payload()?; + let point = d.decode()?; Ok(Message::Acquire(Some(point))) } 8 => Ok(Message::Acquire(None)), 1 => Ok(Message::Acquired), 2 => { - let failure = d.decode_payload()?; + let failure = d.decode()?; Ok(Message::Failure(failure)) } 3 => { - let query = d.decode_payload()?; + let query = d.decode()?; Ok(Message::Query(query)) } 4 => { - let response = d.decode_payload()?; + let response = d.decode()?; Ok(Message::Result(response)) } 5 => Ok(Message::Release), 6 => { - let point = d.decode_payload()?; + let point = d.decode()?; Ok(Message::ReAcquire(point)) } 9 => Ok(Message::ReAcquire(None)), 7 => Ok(Message::Done), - x => Err(Box::new(CodecError::BadLabel(x))), + _ => Err(decode::Error::message( + "unknown variant for localstate message", + )), } } } diff --git a/pallas-miniprotocols/src/localstate/mod.rs b/pallas-miniprotocols/src/localstate/mod.rs index 62b9e5d..95a38e9 100644 --- a/pallas-miniprotocols/src/localstate/mod.rs +++ b/pallas-miniprotocols/src/localstate/mod.rs @@ -3,11 +3,9 @@ pub mod queries; use std::fmt::Debug; -use log::debug; +use pallas_codec::Fragment; -use crate::machines::{ - Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition, -}; +use crate::machines::{Agent, MachineError, MachineOutput, Transition}; use crate::common::Point; @@ -26,8 +24,8 @@ pub enum AcquireFailure { PointNotInChain, } pub trait Query: Debug { - type Request: EncodePayload + DecodePayload + Clone + Debug; - type Response: EncodePayload + DecodePayload + Clone + Debug; + type Request: Clone + Debug; + type Response: Clone + Debug; } #[derive(Debug)] @@ -52,7 +50,11 @@ pub struct OneShotClient { pub output: Option>, } -impl OneShotClient { +impl OneShotClient +where + Q: Query, + Message: Fragment, +{ pub fn initial(check_point: Option, request: Q::Request) -> Self { Self { state: State::Idle, @@ -96,7 +98,7 @@ impl OneShotClient { } fn on_acquired(self) -> Transition { - debug!("acquired check point for chain state"); + log::debug!("acquired check point for chain state"); Ok(Self { state: State::Acquired, @@ -105,7 +107,7 @@ impl OneShotClient { } fn on_result(self, response: Q::Response) -> Transition { - debug!("query result received: {:?}", response); + log::debug!("query result received: {:?}", response); Ok(Self { state: State::Acquired, @@ -115,7 +117,7 @@ impl OneShotClient { } fn on_failure(self, failure: AcquireFailure) -> Transition { - debug!("acquire failure: {:?}", failure); + log::debug!("acquire failure: {:?}", failure); Ok(Self { state: State::Idle, @@ -132,7 +134,11 @@ impl OneShotClient { } } -impl Agent for OneShotClient { +impl Agent for OneShotClient +where + Q: Query + 'static, + Message: Fragment, +{ type Message = Message; fn is_done(&self) -> bool { diff --git a/pallas-miniprotocols/src/localstate/queries.rs b/pallas-miniprotocols/src/localstate/queries.rs index 8ddad8f..d79027c 100644 --- a/pallas-miniprotocols/src/localstate/queries.rs +++ b/pallas-miniprotocols/src/localstate/queries.rs @@ -1,9 +1,9 @@ -use crate::common::Point; -use crate::machines::{DecodePayload, EncodePayload, PayloadDecoder}; -use crate::payloads::PayloadEncoder; -use minicbor::{data::Cbor, Decoder}; +use pallas_codec::{ + impl_fragment, + minicbor::{data::Cbor, decode, encode, Decode, Decoder, Encode, Encoder}, +}; -use super::Query; +use super::{Message, Query}; #[derive(Debug, Clone)] pub struct BlockQuery {} @@ -16,8 +16,8 @@ pub enum RequestV10 { GetChainPoint, } -impl EncodePayload for RequestV10 { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for RequestV10 { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { match self { Self::BlockQuery(..) => { todo!() @@ -38,8 +38,8 @@ impl EncodePayload for RequestV10 { } } -impl DecodePayload for RequestV10 { - fn decode_payload(_d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for RequestV10 { + fn decode(_d: &mut Decoder<'b>) -> Result { todo!() } } @@ -47,14 +47,14 @@ impl DecodePayload for RequestV10 { #[derive(Debug, Clone)] pub struct GenericResponse(Vec); -impl EncodePayload for GenericResponse { - fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for GenericResponse { + fn encode(&self, _e: &mut Encoder) -> Result<(), encode::Error> { todo!() } } -impl DecodePayload for GenericResponse { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for GenericResponse { + fn decode(d: &mut Decoder<'b>) -> Result { let cbor: Cbor = d.decode()?; let slice = cbor.as_ref(); let vec = slice.to_vec(); @@ -62,15 +62,6 @@ impl DecodePayload for GenericResponse { } } -impl TryInto for GenericResponse { - type Error = Box; - - fn try_into(self) -> Result { - let mut d = PayloadDecoder(Decoder::new(self.0.as_slice())); - Point::decode_payload(&mut d) - } -} - #[derive(Debug, Clone)] pub struct QueryV10 {} @@ -78,3 +69,5 @@ impl Query for QueryV10 { type Request = RequestV10; type Response = GenericResponse; } + +impl_fragment!(Message); diff --git a/pallas-miniprotocols/src/machines.rs b/pallas-miniprotocols/src/machines.rs index cd8e543..15f8c69 100644 --- a/pallas-miniprotocols/src/machines.rs +++ b/pallas-miniprotocols/src/machines.rs @@ -1,11 +1,9 @@ -use log::{debug, trace}; +pub use crate::payloads::*; +use pallas_codec::Fragment; use pallas_multiplexer::{Channel, Payload}; -use std::borrow::Borrow; use std::fmt::{Debug, Display}; use std::sync::mpsc::Sender; -pub use crate::payloads::*; - #[derive(Debug)] pub enum MachineError where @@ -62,12 +60,13 @@ impl Display for CodecError { } pub trait MachineOutput { - fn send_msg(&self, data: &impl EncodePayload) -> Result<(), Box>; + fn send_msg(&self, data: &impl Fragment) -> Result<(), Box>; } impl MachineOutput for Sender { - fn send_msg(&self, data: &impl EncodePayload) -> Result<(), Box> { - let payload = to_payload(data.borrow())?; + fn send_msg(&self, data: &impl Fragment) -> Result<(), Box> { + let mut payload = Vec::new(); + data.write_cbor(&mut payload)?; self.send(payload)?; Ok(()) @@ -77,7 +76,7 @@ impl MachineOutput for Sender { pub type Transition = Result>; pub trait Agent: Sized { - type Message: DecodePayload + Debug; + type Message; fn is_done(&self) -> bool; fn has_agency(&self) -> bool; @@ -85,29 +84,27 @@ pub trait Agent: Sized { fn receive_next(self, msg: Self::Message) -> Transition; } -pub fn run_agent( - agent: T, - channel: &mut Channel, -) -> Result> { +pub fn run_agent(agent: T, channel: &mut Channel) -> Result> +where + T: Agent + Debug, + T::Message: Fragment + Debug, +{ let Channel(tx, rx) = channel; - let mut input = PayloadDeconstructor { - rx, - remaining: Vec::new(), - }; - let mut agent = agent; while !agent.is_done() { - debug!("evaluating agent {:?}", agent); + log::debug!("evaluating agent {:?}", agent); match agent.has_agency() { true => { agent = agent.send_next(tx)?; } false => { - let msg = input.consume_next_message::()?; - trace!("procesing inbound msg: {:?}", msg); + let mut buffer = Vec::new(); + + let msg = read_until_full_msg::(&mut buffer, rx).unwrap(); + log::trace!("procesing inbound msg: {:?}", msg); agent = agent.receive_next(msg)?; } } diff --git a/pallas-miniprotocols/src/payloads.rs b/pallas-miniprotocols/src/payloads.rs index 99662c9..08f437b 100644 --- a/pallas-miniprotocols/src/payloads.rs +++ b/pallas-miniprotocols/src/payloads.rs @@ -1,164 +1,43 @@ -use crate::machines::CodecError; - -use log::debug; -use minicbor::{Decoder, Encoder}; +use pallas_codec::Fragment; use pallas_multiplexer::Payload; -use std::{ - ops::{Deref, DerefMut}, - sync::mpsc::Receiver, -}; +use std::sync::mpsc::Receiver; -pub struct PayloadEncoder<'a>(Encoder<&'a mut Vec>); +pub type Error = Box; -impl<'a> Deref for PayloadEncoder<'a> { - type Target = Encoder<&'a mut Vec>; - - fn deref(&self) -> &Self::Target { - &self.0 - } +enum Decoding { + Done(M), + NotEnoughData, + UnexpectedError(Error), } -impl<'a> DerefMut for PayloadEncoder<'a> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl<'a> PayloadEncoder<'a> { - pub fn encode_payload( - &mut self, - t: &T, - ) -> Result<(), Box> { - t.encode_payload(self) - } -} - -pub struct PayloadDecoder<'a>(pub Decoder<'a>); - -impl<'a> Deref for PayloadDecoder<'a> { - type Target = Decoder<'a>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a> DerefMut for PayloadDecoder<'a> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl<'a> PayloadDecoder<'a> { - pub fn decode_payload(&mut self) -> Result> { - T::decode_payload(self) - } -} - -pub trait EncodePayload { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box>; -} - -pub fn to_payload(data: &dyn EncodePayload) -> Result> { - let mut payload = Vec::new(); - let mut encoder = PayloadEncoder(minicbor::encode::Encoder::new(&mut payload)); - data.encode_payload(&mut encoder)?; - - Ok(payload) -} - -impl EncodePayload for Vec +fn try_decode_message(buffer: &[u8]) -> Decoding where - D: EncodePayload, + M: Fragment, { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { - e.array(self.len() as u64)?; + let maybe_msg: Result = M::read_cbor(buffer); - for item in self { - item.encode_payload(e)?; - } - - Ok(()) + match maybe_msg { + Ok(msg) => Decoding::Done(msg), + Err(err) if err.is_end_of_input() => Decoding::NotEnoughData, + Err(err) => Decoding::UnexpectedError(Box::new(err)), } } -impl DecodePayload for Vec +pub fn read_until_full_msg( + buffer: &mut Vec, + receiver: &mut Receiver, +) -> Result where - D: DecodePayload, + M: Fragment, { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - let len = d.array()?.ok_or(CodecError::UnexpectedCbor( - "expecting definite-length array", - ))? as usize; + let chunk = receiver.recv()?; + buffer.extend(chunk); - let mut output = Vec::::with_capacity(len); + let decoding = try_decode_message::(buffer); - #[allow(clippy::needless_range_loop)] - for i in 0..(len - 1) { - output[i] = D::decode_payload(d)?; - } - - Ok(output) - } -} - -pub trait DecodePayload: Sized { - fn decode_payload(d: &mut PayloadDecoder) -> Result>; -} - -impl DecodePayload for Option { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - match d.datatype()? { - minicbor::data::Type::Undefined => Ok(None), - _ => { - let value = d.decode_payload()?; - Ok(Some(value)) - } - } - } -} - -pub struct PayloadDeconstructor<'a> { - pub(crate) rx: &'a mut Receiver, - pub(crate) remaining: Vec, -} - -impl<'a> PayloadDeconstructor<'a> { - pub fn consume_next_message( - &mut self, - ) -> Result> { - if self.remaining.is_empty() { - debug!("no remaining payload, fetching next segment"); - let payload = self.rx.recv()?; - self.remaining.extend(payload); - } - - let mut decoder = PayloadDecoder(minicbor::Decoder::new(&self.remaining)); - - match T::decode_payload(&mut decoder) { - Ok(t) => { - let new_pos = decoder.position(); - self.remaining.drain(0..new_pos); - debug!("consumed {} from payload buffer", new_pos); - Ok(t) - } - Err(err) => { - let downcast = err.downcast::(); - - match downcast { - Ok(err) => match err.deref().is_end_of_input() { - true => { - debug!("payload incomplete, fetching next segment"); - let payload = self.rx.recv()?; - self.remaining.extend(payload); - - self.consume_next_message::() - } - _ => Err(err), - }, - Err(err) => Err(err), - } - } - } + match decoding { + Decoding::Done(msg) => Ok(msg), + Decoding::UnexpectedError(err) => Err(err), + Decoding::NotEnoughData => read_until_full_msg::(buffer, receiver), } } diff --git a/pallas-miniprotocols/src/txsubmission/mod.rs b/pallas-miniprotocols/src/txsubmission/mod.rs index 6c4dc4a..2b823e4 100644 --- a/pallas-miniprotocols/src/txsubmission/mod.rs +++ b/pallas-miniprotocols/src/txsubmission/mod.rs @@ -2,12 +2,13 @@ use std::fmt::Debug; use itertools::Itertools; use log::debug; - -use crate::machines::{ - Agent, CodecError, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, - PayloadEncoder, Transition, +use pallas_codec::{ + impl_fragment, + minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}, }; +use crate::machines::{Agent, MachineError, MachineOutput, Transition}; + #[derive(Debug, PartialEq, Clone)] pub enum State { Idle, @@ -28,8 +29,8 @@ pub type TxId = u64; #[derive(Debug)] pub struct TxIdAndSize(TxId, TxSizeInBytes); -impl EncodePayload for TxIdAndSize { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for TxIdAndSize { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { e.array(2)?; e.u64(self.0)?; e.u32(self.1)?; @@ -38,8 +39,8 @@ impl EncodePayload for TxIdAndSize { } } -impl DecodePayload for TxIdAndSize { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for TxIdAndSize { + fn decode(d: &mut Decoder<'b>) -> Result { d.array()?; let id = d.u64()?; let size = d.u32()?; @@ -68,8 +69,8 @@ pub enum Message { Done, } -impl EncodePayload for Message { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { +impl Encode for Message { + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { match self { Message::RequestTxIds(blocking, ack, req) => { e.array(4)?.u16(0)?; @@ -82,7 +83,7 @@ impl EncodePayload for Message { e.array(2)?.u16(1)?; e.array(ids.len() as u64)?; for id in ids { - id.encode_payload(e)?; + id.encode(e)?; } Ok(()) } @@ -110,8 +111,8 @@ impl EncodePayload for Message { } } -impl DecodePayload for Message { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { +impl<'b> Decode<'b> for Message { + fn decode(d: &mut Decoder<'b>) -> Result { d.array()?; let label = d.u16()?; @@ -123,7 +124,7 @@ impl DecodePayload for Message { Ok(Message::RequestTxIds(blocking, ack, req)) } 1 => { - let items = Vec::::decode_payload(d)?; + let items = Vec::::decode(d)?; Ok(Message::ReplyTxIds(items)) } 2 => { @@ -134,11 +135,15 @@ impl DecodePayload for Message { todo!() } 4 => Ok(Message::Done), - x => Err(Box::new(CodecError::BadLabel(x))), + _ => Err(decode::Error::message( + "unknown variant for txsubmission message", + )), } } } +impl_fragment!(Message); + /// A very basic tx provider agent with a fixed set of tx to submit /// /// This provider takes a set of tx from a vec as the single, static source of diff --git a/pallas-primitives/Cargo.toml b/pallas-primitives/Cargo.toml index e1fa41d..a673361 100644 --- a/pallas-primitives/Cargo.toml +++ b/pallas-primitives/Cargo.toml @@ -14,9 +14,8 @@ authors = [ ] [dependencies] -minicbor = { version = "0.14", features = ["std", "half"] } -minicbor-derive = "0.9.0" hex = "0.4.3" log = "0.4.14" pallas-crypto = { version = "0.7.0-alpha.0", path = "../pallas-crypto" } +pallas-codec = { version = "0.7.0-alpha.0", path = "../pallas-codec" } base58 = "0.2.0" diff --git a/pallas-primitives/src/alonzo/model.rs b/pallas-primitives/src/alonzo/model.rs index 0abfe90..66defb8 100644 --- a/pallas-primitives/src/alonzo/model.rs +++ b/pallas-primitives/src/alonzo/model.rs @@ -2,12 +2,14 @@ //! //! Handcrafted, idiomatic rust artifacts based on based on the [Alonzo CDDL](https://github.com/input-output-hk/cardano-ledger/blob/master/eras/alonzo/test-suite/cddl-files/alonzo.cddl) file in IOHK repo. -use minicbor::{bytes::ByteVec, data::Tag}; -use minicbor_derive::{Decode, Encode}; +use pallas_codec::minicbor::{bytes::ByteVec, data::Tag, Decode, Encode}; use pallas_crypto::hash::Hash; use std::ops::Deref; -use crate::utils::{KeyValuePairs, MaybeIndefArray}; +use pallas_codec::utils::{KeyValuePairs, MaybeIndefArray}; + +// required for derive attrs to work +use pallas_codec::minicbor; #[derive(Encode, Decode, Debug, PartialEq, Clone)] pub struct VrfCert(#[n(0)] pub ByteVec, #[n(1)] pub ByteVec); @@ -1263,7 +1265,7 @@ impl<'b> minicbor::Decode<'b> for Metadatum { } minicbor::data::Type::I64 => { let i = d.i64()?; - Ok(Metadatum::Int(i as i64)) + Ok(Metadatum::Int(i)) } minicbor::data::Type::Bytes => Ok(Metadatum::Bytes(d.decode()?)), minicbor::data::Type::String => Ok(Metadatum::Text(d.decode()?)), @@ -1396,7 +1398,7 @@ pub struct BlockWrapper(#[n(0)] pub u16, #[n(1)] pub Block); mod tests { use super::BlockWrapper; use crate::Fragment; - use minicbor::{self, to_vec}; + use pallas_codec::minicbor::to_vec; #[test] fn block_isomorphic_decoding_encoding() { diff --git a/pallas-primitives/src/byron/address.rs b/pallas-primitives/src/byron/address.rs index 9688c3a..0e86074 100644 --- a/pallas-primitives/src/byron/address.rs +++ b/pallas-primitives/src/byron/address.rs @@ -2,7 +2,7 @@ use crate::Error; use super::Address; use base58::ToBase58; -use minicbor::to_vec; +use pallas_codec::minicbor::to_vec; impl Address { pub fn to_addr_string(&self) -> Result { @@ -13,10 +13,9 @@ impl Address { #[cfg(test)] mod tests { - use std::ops::Deref; - use crate::byron::Block; use crate::Fragment; + use std::ops::Deref; const KNOWN_ADDRESSES: &[&str] = &[ "DdzFFzCqrht8QHTQXbWy2qoyPaqTN8BjyfKygGmpy9dtot1tvkBfCaVTnR22XCaaDVn3M1U6aiMShoCLzw6VWSwzQKhhJrM3YjYp3wyy", diff --git a/pallas-primitives/src/byron/fees.rs b/pallas-primitives/src/byron/fees.rs index aee131d..0eb463a 100644 --- a/pallas-primitives/src/byron/fees.rs +++ b/pallas-primitives/src/byron/fees.rs @@ -1,7 +1,7 @@ use crate::Error; use super::TxPayload; -use minicbor::to_vec; +use pallas_codec::minicbor::to_vec; pub struct PolicyParams { constant: u64, diff --git a/pallas-primitives/src/byron/model.rs b/pallas-primitives/src/byron/model.rs index 559cd00..ac680b9 100644 --- a/pallas-primitives/src/byron/model.rs +++ b/pallas-primitives/src/byron/model.rs @@ -2,15 +2,17 @@ //! //! Handcrafted, idiomatic rust artifacts based on based on the [Byron CDDL](https://github.com/input-output-hk/cardano-ledger/blob/master/eras/byron/cddl-spec/byron.cddl) file in IOHK repo. -use minicbor::bytes::ByteVec; -use minicbor_derive::{Decode, Encode}; +use pallas_codec::minicbor::{bytes::ByteVec, Decode, Encode}; use pallas_crypto::hash::Hash; -use crate::utils::{ +use pallas_codec::utils::{ CborWrap, EmptyMap, KeyValuePairs, MaybeIndefArray, OrderPreservingProperties, TagWrap, ZeroOrOneArray, }; +// required for derive attrs to work +use pallas_codec::minicbor; + // Basic Cardano Types pub type Blake2b256 = Hash<32>; @@ -957,7 +959,7 @@ mod tests { use crate::byron::{Block, BlockHead}; use crate::Fragment; - use minicbor::{self, to_vec}; + use pallas_codec::minicbor::to_vec; #[test] fn block_isomorphic_decoding_encoding() { diff --git a/pallas-primitives/src/framework.rs b/pallas-primitives/src/framework.rs index 2f1886d..a160855 100644 --- a/pallas-primitives/src/framework.rs +++ b/pallas-primitives/src/framework.rs @@ -1,5 +1,7 @@ pub type Error = Box; +use pallas_codec::minicbor::{decode, to_vec, Decode, Encode}; + pub trait Fragment<'a> where Self: Sized, @@ -10,14 +12,14 @@ where impl<'a, T> Fragment<'a> for T where - T: minicbor::Encode + minicbor::Decode<'a> + Sized, + T: Encode + Decode<'a> + Sized, { fn encode_fragment(&self) -> Result, Error> { - minicbor::to_vec(self).map_err(|e| e.into()) + to_vec(self).map_err(|e| e.into()) } fn decode_fragment(bytes: &'a [u8]) -> Result { - minicbor::decode(bytes).map_err(|e| e.into()) + decode(bytes).map_err(|e| e.into()) } } diff --git a/pallas-primitives/src/lib.rs b/pallas-primitives/src/lib.rs index fe32a29..a623304 100644 --- a/pallas-primitives/src/lib.rs +++ b/pallas-primitives/src/lib.rs @@ -5,6 +5,5 @@ mod framework; pub mod alonzo; pub mod byron; pub mod probing; -pub mod utils; pub use framework::*; diff --git a/pallas-primitives/src/probing.rs b/pallas-primitives/src/probing.rs index 02f69ca..423ba21 100644 --- a/pallas-primitives/src/probing.rs +++ b/pallas-primitives/src/probing.rs @@ -1,6 +1,6 @@ //! Heuristics for detecting cbor content without decoding -use minicbor::decode::{Token, Tokenizer}; +use pallas_codec::minicbor::decode::{Token, Tokenizer}; use crate::Era; diff --git a/pallas/Cargo.toml b/pallas/Cargo.toml index ad40786..31387ec 100644 --- a/pallas/Cargo.toml +++ b/pallas/Cargo.toml @@ -17,3 +17,4 @@ pallas-multiplexer = { version = "0.7.0-alpha.0", path = "../pallas-multiplexer/ pallas-miniprotocols = { version = "0.7.0-alpha.0", path = "../pallas-miniprotocols/" } pallas-primitives = { version = "0.7.0-alpha.0", path = "../pallas-primitives/" } pallas-crypto = { version = "0.7.0-alpha.0", path = "../pallas-crypto/" } +pallas-codec = { version = "0.7.0-alpha.0", path = "../pallas-codec/" } diff --git a/pallas/src/lib.rs b/pallas/src/lib.rs index c6115f3..067f723 100644 --- a/pallas/src/lib.rs +++ b/pallas/src/lib.rs @@ -15,3 +15,6 @@ pub mod ledger; #[doc(inline)] pub use pallas_crypto as crypto; + +#[doc(inline)] +pub use pallas_codec as codec;