refactor: make chainsync machine agnostic of content

This commit is contained in:
Santiago Carmuega 2021-12-14 21:43:29 -03:00
parent 7706dd7f5b
commit 2a880af828
18 changed files with 148 additions and 94 deletions

View file

@ -16,7 +16,7 @@ authors = [
crypto = ["cryptoxide"]
[dependencies]
minicbor = { version = "0.11.5", features = ["std"] }
minicbor = { version = "0.12", features = ["std"] }
minicbor-derive = "0.7.2"
hex = "0.4.3"
log = "0.4.14"

View file

@ -1,4 +1,4 @@
use crate::{AuxiliaryData, PlutusData, TransactionBody};
use crate::{AuxiliaryData, Header, PlutusData, TransactionBody};
use cryptoxide::blake2b::Blake2b;
use minicbor::{to_vec, Encode};
@ -15,6 +15,10 @@ fn hash_cbor_encodable(data: &impl Encode) -> Result<Hash32, Error> {
Ok(hash)
}
pub fn hash_block_header(data: &Header) -> Result<Hash32, Error> {
hash_cbor_encodable(data)
}
pub fn hash_auxiliary_data(data: &AuxiliaryData) -> Result<Hash32, Error> {
hash_cbor_encodable(data)
}

View file

@ -71,7 +71,7 @@ where
k.encode(e)?;
v.encode(e)?;
}
e.end()?;
}
}
@ -92,8 +92,8 @@ impl<A> Deref for MaybeIndefArray<A> {
fn deref(&self) -> &Self::Target {
match self {
MaybeIndefArray::Def(x) => &x,
MaybeIndefArray::Indef(x) => &x,
MaybeIndefArray::Def(x) => x,
MaybeIndefArray::Indef(x) => x,
}
}
}

View file

@ -15,7 +15,7 @@ authors = [
[dependencies]
pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" }
pallas-machines = { version = "0.3.0", path = "../pallas-machines/" }
minicbor = { version="0.11.4", features=["half", "std"] }
minicbor = { version="0.12", features=["half", "std"] }
log = "0.4.14"
[dev-dependencies]

View file

@ -1,6 +1,6 @@
use std::sync::mpsc::Receiver;
use log::{debug, info};
use log::debug;
use pallas_machines::{
primitives::Point, Agent, CodecError, DecodePayload, EncodePayload, MachineOutput,
PayloadDecoder, PayloadEncoder, Transition,
@ -47,7 +47,7 @@ impl EncodePayload for Message {
}
Message::Block { body } => {
e.array(2)?.u16(4)?;
e.bytes(&body)?;
e.bytes(body)?;
Ok(())
}
Message::BatchDone => {

View file

@ -15,12 +15,14 @@ authors = [
[dependencies]
pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" }
pallas-machines = { version = "0.3.0", path = "../pallas-machines/" }
minicbor = { version="0.11.4", features=["half", "std"] }
minicbor = { version = "0.12.0", features = ["half", "std"] }
log = "0.4.14"
hex = "0.4.3"
[dev-dependencies]
net2 = "0.2.37"
cryptoxide = "0.3.6"
env_logger = "0.9.0"
pallas-handshake = { version = "0.3.0", path = "../pallas-handshake/" }
pallas-txsubmission = { version = "0.3.0", path = "../pallas-txsubmission/" }
pallas-alonzo = { version = "0.3.0", path = "../pallas-alonzo/", features = ["crypto"] }

View file

@ -1,13 +1,39 @@
use pallas_chainsync::{ClientConsumer, NoopObserver};
use pallas_handshake::{
n2c::{Client, VersionTable},
MAINNET_MAGIC,
};
use pallas_machines::primitives::Point;
use pallas_alonzo::{crypto, Block, BlockWrapper, Fragment};
use pallas_chainsync::{BlockLike, Consumer, NoopObserver};
use pallas_handshake::n2c::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_machines::run_agent;
use pallas_machines::{
primitives::Point, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder,
};
use pallas_multiplexer::Multiplexer;
use std::os::unix::net::UnixStream;
#[derive(Debug)]
pub struct Content(Block);
impl EncodePayload for Content {
fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
impl DecodePayload for Content {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.tag()?;
let bytes = d.bytes()?;
let BlockWrapper(_, block) = BlockWrapper::decode_fragment(bytes)?;
Ok(Content(block))
}
}
impl BlockLike for Content {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
let hash = crypto::hash_block_header(&self.0.header)?;
Ok(Point(self.0.header.header_body.slot, Vec::from(hash)))
}
}
fn main() {
env_logger::init();
@ -24,12 +50,12 @@ fn main() {
// some random known-point in the chain to use as starting point for the sync
let known_points = vec![Point(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
45147459,
hex::decode("bee16ef28ac02abb50c340a7deff085a77f3a7b84c66250b3318dcb125c19a10").unwrap(),
)];
let mut cs_channel = muxer.use_channel(5);
let cs = ClientConsumer::initial(known_points, NoopObserver {});
let cs = Consumer::<Content, _>::initial(known_points, NoopObserver {});
let cs = run_agent(cs, &mut cs_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -1,13 +1,47 @@
use minicbor::data::Tag;
use net2::TcpStreamExt;
use pallas_alonzo::{crypto, Fragment, Header};
use pallas_machines::primitives::Point;
use std::net::TcpStream;
use pallas_chainsync::{ClientConsumer, NoopObserver};
use pallas_chainsync::{BlockLike, Consumer, NoopObserver};
use pallas_handshake::n2n::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_machines::run_agent;
use pallas_machines::{run_agent, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder};
use pallas_multiplexer::Multiplexer;
#[derive(Debug)]
pub struct Content(u32, Header);
impl EncodePayload for Content {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
e.array(2)?;
e.u32(self.0)?;
e.tag(Tag::Cbor)?;
e.bytes(&self.1.encode_fragment()?)?;
Ok(())
}
}
impl DecodePayload for Content {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let unknown = d.u32()?; // WTF is this value?
d.tag()?;
let bytes = d.bytes()?;
let header = Header::decode_fragment(bytes)?;
Ok(Content(unknown, header))
}
}
impl BlockLike for Content {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
let hash = crypto::hash_block_header(&self.1)?;
Ok(Point(self.1.header_body.slot, Vec::from(hash)))
}
}
fn main() {
env_logger::init();
@ -29,7 +63,7 @@ fn main() {
let mut cs_channel = muxer.use_channel(2);
let cs = ClientConsumer::initial(known_points, NoopObserver {});
let cs = Consumer::<Content, _>::initial(known_points, NoopObserver {});
let cs = run_agent(cs, &mut cs_channel).unwrap();
println!("{:?}", cs);

View file

@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{fmt::Debug, marker::PhantomData};
use log::{debug, log_enabled, trace};
@ -6,7 +6,13 @@ use pallas_machines::{
primitives::Point, Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition,
};
use crate::{BlockBody, Message, State, Tip, WrappedHeader};
use crate::{Message, State, Tip};
/// A trait to deal with polymorphic payloads in the ChainSync protocol
/// (WrappedHeader vs BlockBody)
pub trait BlockLike: EncodePayload + DecodePayload + Debug {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>>;
}
/// An observer of chain-sync events sent by the state-machine
pub trait Observer<C>
@ -69,7 +75,7 @@ where
impl<C, O> Consumer<C, O>
where
C: EncodePayload + DecodePayload + Debug,
C: BlockLike + EncodePayload + DecodePayload + Debug,
O: Observer<C>,
{
pub fn initial(known_points: Vec<Point>, observer: O) -> Self {
@ -133,6 +139,8 @@ where
fn on_roll_forward(self, content: C, tip: Tip) -> Transition<Self> {
debug!("rolling forward");
let point = content.block_point()?;
if log_enabled!(log::Level::Trace) {
trace!("content: {:?}", content);
}
@ -141,6 +149,7 @@ where
self.observer.on_block(&self.cursor, &content)?;
Ok(Self {
cursor: Some(point),
tip: Some(tip),
state: State::Idle,
..self
@ -176,7 +185,7 @@ where
impl<C, O> Agent for Consumer<C, O>
where
C: EncodePayload + DecodePayload + Debug + 'static,
C: BlockLike + EncodePayload + DecodePayload + Debug + 'static,
O: Observer<C>,
{
type Message = Message<C>;
@ -229,10 +238,6 @@ where
}
}
pub type NodeConsumer<S> = Consumer<WrappedHeader, S>;
pub type ClientConsumer<S> = Consumer<BlockBody, S>;
#[derive(Debug)]
pub struct TipFinder {
pub state: State,
@ -250,7 +255,7 @@ impl TipFinder {
}
fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::<WrappedHeader>::FindIntersect(vec![self.wellknown_point.clone()]);
let msg = Message::<NoopContent>::FindIntersect(vec![self.wellknown_point.clone()]);
tx.send_msg(&msg)?;
@ -281,8 +286,34 @@ impl TipFinder {
}
}
#[derive(Debug)]
pub struct NoopContent {}
impl EncodePayload for NoopContent {
fn encode_payload(
&self,
_e: &mut pallas_machines::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
impl DecodePayload for NoopContent {
fn decode_payload(
_d: &mut pallas_machines::PayloadDecoder,
) -> Result<Self, Box<dyn std::error::Error>> {
todo!()
}
}
impl BlockLike for NoopContent {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
todo!()
}
}
impl Agent for TipFinder {
type Message = Message<WrappedHeader>;
type Message = Message<NoopContent>;
fn is_done(&self) -> bool {
self.state == State::Done

View file

@ -1,47 +1,8 @@
use minicbor::data::Tag;
use pallas_machines::{
primitives::Point, CodecError, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder,
};
use crate::{BlockBody, Message, Tip, WrappedHeader};
impl EncodePayload for WrappedHeader {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
e.array(2)?;
e.u64(self.0)?;
e.tag(Tag::Cbor)?;
e.bytes(&self.1)?;
Ok(())
}
}
impl DecodePayload for WrappedHeader {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let unknown = d.u64()?; // WTF is this value?
d.tag()?;
let bytes = Vec::from(d.bytes()?);
Ok(WrappedHeader(unknown, bytes))
}
}
impl EncodePayload for BlockBody {
fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
impl DecodePayload for BlockBody {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.tag()?;
let bytes = Vec::from(d.bytes()?);
Ok(BlockBody(bytes))
}
}
use crate::{Message, Tip};
impl EncodePayload for Tip {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {

View file

@ -2,12 +2,6 @@ use std::fmt::Debug;
use pallas_machines::{primitives::Point, DecodePayload, EncodePayload};
#[derive(Debug)]
pub struct WrappedHeader(pub u64, pub Vec<u8>);
#[derive(Debug)]
pub struct BlockBody(pub Vec<u8>);
#[derive(Debug)]
pub struct Tip(pub Point, pub u64);

View file

@ -15,7 +15,7 @@ authors = [
[dependencies]
pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" }
pallas-machines = { version = "0.3.0", path = "../pallas-machines/" }
minicbor = { version="0.11.4", features=["half", "std"] }
minicbor = { version="0.12", features=["half", "std"] }
itertools = "0.10.1"
log = "0.4.14"

View file

@ -15,7 +15,7 @@ authors = [
[dependencies]
pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" }
pallas-machines = { version = "0.3.0", path = "../pallas-machines/" }
minicbor = { version="0.11.4", features=["half", "std"] }
minicbor = { version="0.12", features=["half", "std"] }
log = "0.4.14"
hex = "0.4.3"

View file

@ -14,6 +14,6 @@ authors = [
[dependencies]
pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" }
minicbor = { version="0.11.4", features=["half", "std"] }
minicbor = { version="0.12", features=["half", "std"] }
log = "0.4.14"
hex = "0.4.3"

View file

@ -126,7 +126,7 @@ impl<'a> PayloadDeconstructor<'a> {
pub fn consume_next_message<T: DecodePayload>(
&mut self,
) -> Result<T, Box<dyn std::error::Error>> {
if self.remaining.len() == 0 {
if self.remaining.is_empty() {
debug!("no remaining payload, fetching next segment");
let payload = self.rx.recv()?;
self.remaining.extend(payload);

View file

@ -25,7 +25,11 @@ fn main() {
loop {
let payload = rx.recv().unwrap();
info!("got message within thread, id:{}, length:{}", protocol, payload.len());
info!(
"got message within thread, id:{}, length:{}",
protocol,
payload.len()
);
}
});
}

View file

@ -128,20 +128,18 @@ impl Multiplexer {
where
TBearer: Bearer + 'static,
{
let handles = protocols
.iter()
.map(|id| {
let (demux_tx, demux_rx) = mpsc::channel::<Payload>();
let (mux_tx, mux_rx) = mpsc::channel::<Payload>();
let handles = protocols.iter().map(|id| {
let (demux_tx, demux_rx) = mpsc::channel::<Payload>();
let (mux_tx, mux_rx) = mpsc::channel::<Payload>();
let channel = Channel(mux_tx, demux_rx);
let channel = Channel(mux_tx, demux_rx);
let protocol_handle: ChannelProtocolHandle = (*id, channel);
let ingress_handle: ChannelIngressHandle = (*id, mux_rx);
let egress_handle: ChannelEgressHandle = (*id, demux_tx);
let protocol_handle: ChannelProtocolHandle = (*id, channel);
let ingress_handle: ChannelIngressHandle = (*id, mux_rx);
let egress_handle: ChannelEgressHandle = (*id, demux_tx);
(protocol_handle, (ingress_handle, egress_handle))
});
(protocol_handle, (ingress_handle, egress_handle))
});
let (protocol_handles, multiplex_handles): (Vec<_>, Vec<_>) = handles.into_iter().unzip();

View file

@ -15,7 +15,7 @@ authors = [
[dependencies]
pallas-multiplexer = { version = "0.3.0", path = "../pallas-multiplexer/" }
pallas-machines = { version = "0.3.0", path = "../pallas-machines/" }
minicbor = { version="0.11.4", features=["half", "std"] }
minicbor = { version="0.12", features=["half", "std"] }
log = "0.4.14"
hex = "0.4.3"
itertools = "0.10.1"