diff --git a/pallas-alonzo/Cargo.toml b/pallas-alonzo/Cargo.toml index 55ff46b..20574a7 100644 --- a/pallas-alonzo/Cargo.toml +++ b/pallas-alonzo/Cargo.toml @@ -16,7 +16,7 @@ authors = [ crypto = ["cryptoxide"] [dependencies] -minicbor = { version = "0.11.5", features = ["std"] } +minicbor = { version = "0.12", features = ["std"] } minicbor-derive = "0.7.2" hex = "0.4.3" log = "0.4.14" diff --git a/pallas-alonzo/src/crypto.rs b/pallas-alonzo/src/crypto.rs index 3f298b6..f9d3e94 100644 --- a/pallas-alonzo/src/crypto.rs +++ b/pallas-alonzo/src/crypto.rs @@ -1,4 +1,4 @@ -use crate::{AuxiliaryData, PlutusData, TransactionBody}; +use crate::{AuxiliaryData, Header, PlutusData, TransactionBody}; use cryptoxide::blake2b::Blake2b; use minicbor::{to_vec, Encode}; @@ -15,6 +15,10 @@ fn hash_cbor_encodable(data: &impl Encode) -> Result { Ok(hash) } +pub fn hash_block_header(data: &Header) -> Result { + hash_cbor_encodable(data) +} + pub fn hash_auxiliary_data(data: &AuxiliaryData) -> Result { hash_cbor_encodable(data) } diff --git a/pallas-alonzo/src/utils.rs b/pallas-alonzo/src/utils.rs index abfc38f..c400a7c 100644 --- a/pallas-alonzo/src/utils.rs +++ b/pallas-alonzo/src/utils.rs @@ -71,7 +71,7 @@ where k.encode(e)?; v.encode(e)?; } - + e.end()?; } } @@ -92,8 +92,8 @@ impl Deref for MaybeIndefArray { fn deref(&self) -> &Self::Target { match self { - MaybeIndefArray::Def(x) => &x, - MaybeIndefArray::Indef(x) => &x, + MaybeIndefArray::Def(x) => x, + MaybeIndefArray::Indef(x) => x, } } } diff --git a/pallas-blockfetch/Cargo.toml b/pallas-blockfetch/Cargo.toml index 3fc087b..63caa91 100644 --- a/pallas-blockfetch/Cargo.toml +++ b/pallas-blockfetch/Cargo.toml @@ -15,7 +15,7 @@ authors = [ [dependencies] pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" } pallas-machines = { version = "0.3.0", path = "../pallas-machines/" } -minicbor = { version="0.11.4", features=["half", "std"] } +minicbor = { version="0.12", features=["half", "std"] } log = "0.4.14" [dev-dependencies] diff --git a/pallas-blockfetch/src/lib.rs b/pallas-blockfetch/src/lib.rs index 3dfcb10..5472af3 100644 --- a/pallas-blockfetch/src/lib.rs +++ b/pallas-blockfetch/src/lib.rs @@ -1,6 +1,6 @@ use std::sync::mpsc::Receiver; -use log::{debug, info}; +use log::debug; use pallas_machines::{ primitives::Point, Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder, Transition, @@ -47,7 +47,7 @@ impl EncodePayload for Message { } Message::Block { body } => { e.array(2)?.u16(4)?; - e.bytes(&body)?; + e.bytes(body)?; Ok(()) } Message::BatchDone => { diff --git a/pallas-chainsync/Cargo.toml b/pallas-chainsync/Cargo.toml index 3fd98e9..ec6b2c8 100644 --- a/pallas-chainsync/Cargo.toml +++ b/pallas-chainsync/Cargo.toml @@ -15,12 +15,14 @@ authors = [ [dependencies] pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" } pallas-machines = { version = "0.3.0", path = "../pallas-machines/" } -minicbor = { version="0.11.4", features=["half", "std"] } +minicbor = { version = "0.12.0", features = ["half", "std"] } log = "0.4.14" hex = "0.4.3" [dev-dependencies] net2 = "0.2.37" +cryptoxide = "0.3.6" env_logger = "0.9.0" pallas-handshake = { version = "0.3.0", path = "../pallas-handshake/" } pallas-txsubmission = { version = "0.3.0", path = "../pallas-txsubmission/" } +pallas-alonzo = { version = "0.3.0", path = "../pallas-alonzo/", features = ["crypto"] } diff --git a/pallas-chainsync/examples/blocks.rs b/pallas-chainsync/examples/blocks.rs index 768b5be..7e9fc74 100644 --- a/pallas-chainsync/examples/blocks.rs +++ b/pallas-chainsync/examples/blocks.rs @@ -1,13 +1,39 @@ -use pallas_chainsync::{ClientConsumer, NoopObserver}; -use pallas_handshake::{ - n2c::{Client, VersionTable}, - MAINNET_MAGIC, -}; -use pallas_machines::primitives::Point; +use pallas_alonzo::{crypto, Block, BlockWrapper, Fragment}; +use pallas_chainsync::{BlockLike, Consumer, NoopObserver}; +use pallas_handshake::n2c::{Client, VersionTable}; +use pallas_handshake::MAINNET_MAGIC; use pallas_machines::run_agent; +use pallas_machines::{ + primitives::Point, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder, +}; use pallas_multiplexer::Multiplexer; use std::os::unix::net::UnixStream; +#[derive(Debug)] +pub struct Content(Block); + +impl EncodePayload for Content { + fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box> { + todo!() + } +} + +impl DecodePayload for Content { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.tag()?; + let bytes = d.bytes()?; + let BlockWrapper(_, block) = BlockWrapper::decode_fragment(bytes)?; + Ok(Content(block)) + } +} + +impl BlockLike for Content { + fn block_point(&self) -> Result> { + let hash = crypto::hash_block_header(&self.0.header)?; + Ok(Point(self.0.header.header_body.slot, Vec::from(hash))) + } +} + fn main() { env_logger::init(); @@ -24,12 +50,12 @@ fn main() { // some random known-point in the chain to use as starting point for the sync let known_points = vec![Point( - 43847831u64, - hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), + 45147459, + hex::decode("bee16ef28ac02abb50c340a7deff085a77f3a7b84c66250b3318dcb125c19a10").unwrap(), )]; let mut cs_channel = muxer.use_channel(5); - let cs = ClientConsumer::initial(known_points, NoopObserver {}); + let cs = Consumer::::initial(known_points, NoopObserver {}); let cs = run_agent(cs, &mut cs_channel).unwrap(); println!("{:?}", cs); } diff --git a/pallas-chainsync/examples/headers.rs b/pallas-chainsync/examples/headers.rs index d27e184..572b075 100644 --- a/pallas-chainsync/examples/headers.rs +++ b/pallas-chainsync/examples/headers.rs @@ -1,13 +1,47 @@ +use minicbor::data::Tag; use net2::TcpStreamExt; +use pallas_alonzo::{crypto, Fragment, Header}; use pallas_machines::primitives::Point; use std::net::TcpStream; -use pallas_chainsync::{ClientConsumer, NoopObserver}; +use pallas_chainsync::{BlockLike, Consumer, NoopObserver}; use pallas_handshake::n2n::{Client, VersionTable}; use pallas_handshake::MAINNET_MAGIC; -use pallas_machines::run_agent; +use pallas_machines::{run_agent, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder}; use pallas_multiplexer::Multiplexer; +#[derive(Debug)] +pub struct Content(u32, Header); + +impl EncodePayload for Content { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + e.array(2)?; + e.u32(self.0)?; + e.tag(Tag::Cbor)?; + e.bytes(&self.1.encode_fragment()?)?; + + Ok(()) + } +} + +impl DecodePayload for Content { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let unknown = d.u32()?; // WTF is this value? + d.tag()?; + let bytes = d.bytes()?; + let header = Header::decode_fragment(bytes)?; + Ok(Content(unknown, header)) + } +} + +impl BlockLike for Content { + fn block_point(&self) -> Result> { + let hash = crypto::hash_block_header(&self.1)?; + Ok(Point(self.1.header_body.slot, Vec::from(hash))) + } +} + fn main() { env_logger::init(); @@ -29,7 +63,7 @@ fn main() { let mut cs_channel = muxer.use_channel(2); - let cs = ClientConsumer::initial(known_points, NoopObserver {}); + let cs = Consumer::::initial(known_points, NoopObserver {}); let cs = run_agent(cs, &mut cs_channel).unwrap(); println!("{:?}", cs); diff --git a/pallas-chainsync/src/clients.rs b/pallas-chainsync/src/clients.rs index 39f9bb8..9ed4862 100644 --- a/pallas-chainsync/src/clients.rs +++ b/pallas-chainsync/src/clients.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::{fmt::Debug, marker::PhantomData}; use log::{debug, log_enabled, trace}; @@ -6,7 +6,13 @@ use pallas_machines::{ primitives::Point, Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition, }; -use crate::{BlockBody, Message, State, Tip, WrappedHeader}; +use crate::{Message, State, Tip}; + +/// A trait to deal with polymorphic payloads in the ChainSync protocol +/// (WrappedHeader vs BlockBody) +pub trait BlockLike: EncodePayload + DecodePayload + Debug { + fn block_point(&self) -> Result>; +} /// An observer of chain-sync events sent by the state-machine pub trait Observer @@ -69,7 +75,7 @@ where impl Consumer where - C: EncodePayload + DecodePayload + Debug, + C: BlockLike + EncodePayload + DecodePayload + Debug, O: Observer, { pub fn initial(known_points: Vec, observer: O) -> Self { @@ -133,6 +139,8 @@ where fn on_roll_forward(self, content: C, tip: Tip) -> Transition { debug!("rolling forward"); + let point = content.block_point()?; + if log_enabled!(log::Level::Trace) { trace!("content: {:?}", content); } @@ -141,6 +149,7 @@ where self.observer.on_block(&self.cursor, &content)?; Ok(Self { + cursor: Some(point), tip: Some(tip), state: State::Idle, ..self @@ -176,7 +185,7 @@ where impl Agent for Consumer where - C: EncodePayload + DecodePayload + Debug + 'static, + C: BlockLike + EncodePayload + DecodePayload + Debug + 'static, O: Observer, { type Message = Message; @@ -229,10 +238,6 @@ where } } -pub type NodeConsumer = Consumer; - -pub type ClientConsumer = Consumer; - #[derive(Debug)] pub struct TipFinder { pub state: State, @@ -250,7 +255,7 @@ impl TipFinder { } fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::::FindIntersect(vec![self.wellknown_point.clone()]); + let msg = Message::::FindIntersect(vec![self.wellknown_point.clone()]); tx.send_msg(&msg)?; @@ -281,8 +286,34 @@ impl TipFinder { } } +#[derive(Debug)] +pub struct NoopContent {} + +impl EncodePayload for NoopContent { + fn encode_payload( + &self, + _e: &mut pallas_machines::PayloadEncoder, + ) -> Result<(), Box> { + todo!() + } +} + +impl DecodePayload for NoopContent { + fn decode_payload( + _d: &mut pallas_machines::PayloadDecoder, + ) -> Result> { + todo!() + } +} + +impl BlockLike for NoopContent { + fn block_point(&self) -> Result> { + todo!() + } +} + impl Agent for TipFinder { - type Message = Message; + type Message = Message; fn is_done(&self) -> bool { self.state == State::Done diff --git a/pallas-chainsync/src/codec.rs b/pallas-chainsync/src/codec.rs index 516b430..208bdc0 100644 --- a/pallas-chainsync/src/codec.rs +++ b/pallas-chainsync/src/codec.rs @@ -1,47 +1,8 @@ -use minicbor::data::Tag; - use pallas_machines::{ primitives::Point, CodecError, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder, }; -use crate::{BlockBody, Message, Tip, WrappedHeader}; - -impl EncodePayload for WrappedHeader { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { - e.array(2)?; - e.u64(self.0)?; - e.tag(Tag::Cbor)?; - e.bytes(&self.1)?; - - Ok(()) - } -} - -impl DecodePayload for WrappedHeader { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - d.array()?; - let unknown = d.u64()?; // WTF is this value? - d.tag()?; - let bytes = Vec::from(d.bytes()?); - - Ok(WrappedHeader(unknown, bytes)) - } -} - -impl EncodePayload for BlockBody { - fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box> { - todo!() - } -} - -impl DecodePayload for BlockBody { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - d.tag()?; - let bytes = Vec::from(d.bytes()?); - - Ok(BlockBody(bytes)) - } -} +use crate::{Message, Tip}; impl EncodePayload for Tip { fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { diff --git a/pallas-chainsync/src/protocol.rs b/pallas-chainsync/src/protocol.rs index e981c0b..ddad9fd 100644 --- a/pallas-chainsync/src/protocol.rs +++ b/pallas-chainsync/src/protocol.rs @@ -2,12 +2,6 @@ use std::fmt::Debug; use pallas_machines::{primitives::Point, DecodePayload, EncodePayload}; -#[derive(Debug)] -pub struct WrappedHeader(pub u64, pub Vec); - -#[derive(Debug)] -pub struct BlockBody(pub Vec); - #[derive(Debug)] pub struct Tip(pub Point, pub u64); diff --git a/pallas-handshake/Cargo.toml b/pallas-handshake/Cargo.toml index 4229813..1d93820 100644 --- a/pallas-handshake/Cargo.toml +++ b/pallas-handshake/Cargo.toml @@ -15,7 +15,7 @@ authors = [ [dependencies] pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" } pallas-machines = { version = "0.3.0", path = "../pallas-machines/" } -minicbor = { version="0.11.4", features=["half", "std"] } +minicbor = { version="0.12", features=["half", "std"] } itertools = "0.10.1" log = "0.4.14" diff --git a/pallas-localstate/Cargo.toml b/pallas-localstate/Cargo.toml index 48aecdb..2cac9fc 100644 --- a/pallas-localstate/Cargo.toml +++ b/pallas-localstate/Cargo.toml @@ -15,7 +15,7 @@ authors = [ [dependencies] pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" } pallas-machines = { version = "0.3.0", path = "../pallas-machines/" } -minicbor = { version="0.11.4", features=["half", "std"] } +minicbor = { version="0.12", features=["half", "std"] } log = "0.4.14" hex = "0.4.3" diff --git a/pallas-machines/Cargo.toml b/pallas-machines/Cargo.toml index 2cb945d..940643b 100644 --- a/pallas-machines/Cargo.toml +++ b/pallas-machines/Cargo.toml @@ -14,6 +14,6 @@ authors = [ [dependencies] pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" } -minicbor = { version="0.11.4", features=["half", "std"] } +minicbor = { version="0.12", features=["half", "std"] } log = "0.4.14" hex = "0.4.3" diff --git a/pallas-machines/src/payloads.rs b/pallas-machines/src/payloads.rs index 0ea4285..28b1fed 100644 --- a/pallas-machines/src/payloads.rs +++ b/pallas-machines/src/payloads.rs @@ -126,7 +126,7 @@ impl<'a> PayloadDeconstructor<'a> { pub fn consume_next_message( &mut self, ) -> Result> { - if self.remaining.len() == 0 { + if self.remaining.is_empty() { debug!("no remaining payload, fetching next segment"); let payload = self.rx.recv()?; self.remaining.extend(payload); diff --git a/pallas-multiplexer/examples/listener.rs b/pallas-multiplexer/examples/listener.rs index 1da8694..b89ba27 100644 --- a/pallas-multiplexer/examples/listener.rs +++ b/pallas-multiplexer/examples/listener.rs @@ -25,7 +25,11 @@ fn main() { loop { let payload = rx.recv().unwrap(); - info!("got message within thread, id:{}, length:{}", protocol, payload.len()); + info!( + "got message within thread, id:{}, length:{}", + protocol, + payload.len() + ); } }); } diff --git a/pallas-multiplexer/src/lib.rs b/pallas-multiplexer/src/lib.rs index fc05382..e5b7561 100644 --- a/pallas-multiplexer/src/lib.rs +++ b/pallas-multiplexer/src/lib.rs @@ -128,20 +128,18 @@ impl Multiplexer { where TBearer: Bearer + 'static, { - let handles = protocols - .iter() - .map(|id| { - let (demux_tx, demux_rx) = mpsc::channel::(); - let (mux_tx, mux_rx) = mpsc::channel::(); + let handles = protocols.iter().map(|id| { + let (demux_tx, demux_rx) = mpsc::channel::(); + let (mux_tx, mux_rx) = mpsc::channel::(); - let channel = Channel(mux_tx, demux_rx); + let channel = Channel(mux_tx, demux_rx); - let protocol_handle: ChannelProtocolHandle = (*id, channel); - let ingress_handle: ChannelIngressHandle = (*id, mux_rx); - let egress_handle: ChannelEgressHandle = (*id, demux_tx); + let protocol_handle: ChannelProtocolHandle = (*id, channel); + let ingress_handle: ChannelIngressHandle = (*id, mux_rx); + let egress_handle: ChannelEgressHandle = (*id, demux_tx); - (protocol_handle, (ingress_handle, egress_handle)) - }); + (protocol_handle, (ingress_handle, egress_handle)) + }); let (protocol_handles, multiplex_handles): (Vec<_>, Vec<_>) = handles.into_iter().unzip(); diff --git a/pallas-txsubmission/Cargo.toml b/pallas-txsubmission/Cargo.toml index 539fff9..e0087e9 100644 --- a/pallas-txsubmission/Cargo.toml +++ b/pallas-txsubmission/Cargo.toml @@ -15,7 +15,7 @@ authors = [ [dependencies] pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" } pallas-machines = { version = "0.3.0", path = "../pallas-machines/" } -minicbor = { version="0.11.4", features=["half", "std"] } +minicbor = { version="0.12", features=["half", "std"] } log = "0.4.14" hex = "0.4.3" itertools = "0.10.1"