Tidy up mini protocols
This commit is contained in:
parent
7f50bebafd
commit
b78dc89319
9 changed files with 115 additions and 59 deletions
|
|
@ -1,8 +1,5 @@
|
|||
use log::info;
|
||||
use pallas_machines::{
|
||||
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput,
|
||||
PayloadDecoder, PayloadEncoder, Transition,
|
||||
};
|
||||
use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder, Transition};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Point(pub u64, pub Vec<u8>);
|
||||
|
|
@ -110,7 +107,7 @@ impl DecodePayload for Message {
|
|||
})
|
||||
}
|
||||
5 => Ok(Message::BatchDone),
|
||||
x => Err(Box::new(MachineError::BadLabel(x))),
|
||||
x => Err(Box::new(CodecError::BadLabel(x))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use pallas_chainsync::{ClientConsumer, Point};
|
||||
use pallas_chainsync::{ClientConsumer, NoopStorage, Point};
|
||||
use pallas_handshake::n2c::{Client, VersionTable};
|
||||
use pallas_handshake::MAINNET_MAGIC;
|
||||
use pallas_machines::run_agent;
|
||||
|
|
@ -26,7 +26,7 @@ fn main() {
|
|||
)];
|
||||
|
||||
let (cs_rx, cs_tx) = muxer.use_channel(5);
|
||||
let cs = ClientConsumer::initial(known_points);
|
||||
let cs = ClientConsumer::initial(known_points, NoopStorage { });
|
||||
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();
|
||||
println!("{:?}", cs);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use net2::TcpStreamExt;
|
||||
use std::net::TcpStream;
|
||||
|
||||
use pallas_chainsync::{ClientConsumer, Point};
|
||||
use pallas_chainsync::{ClientConsumer, NoopStorage, Point};
|
||||
use pallas_handshake::n2n::{Client, VersionTable};
|
||||
use pallas_handshake::MAINNET_MAGIC;
|
||||
use pallas_machines::run_agent;
|
||||
|
|
@ -29,7 +29,7 @@ fn main() {
|
|||
|
||||
let (cs_rx, cs_tx) = muxer.use_channel(2);
|
||||
|
||||
let cs = ClientConsumer::initial(known_points);
|
||||
let cs = ClientConsumer::initial(known_points, NoopStorage {});
|
||||
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();
|
||||
|
||||
println!("{:?}", cs);
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use log::{debug, log_enabled, trace};
|
|||
|
||||
use minicbor::data::Tag;
|
||||
use pallas_machines::{
|
||||
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
|
||||
Agent, CodecError, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
|
||||
PayloadEncoder, Transition,
|
||||
};
|
||||
|
||||
|
|
@ -63,10 +63,10 @@ impl DecodePayload for WrappedHeader {
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BlockBody(Vec<u8>);
|
||||
pub struct BlockBody(pub Vec<u8>);
|
||||
|
||||
impl EncodePayload for BlockBody {
|
||||
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
|
||||
fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
@ -116,7 +116,7 @@ pub enum State {
|
|||
#[derive(Debug)]
|
||||
pub enum Message<C>
|
||||
where
|
||||
C: EncodePayload + DecodePayload,
|
||||
C: EncodePayload + DecodePayload + Sized,
|
||||
{
|
||||
RequestNext,
|
||||
AwaitReply,
|
||||
|
|
@ -216,32 +216,58 @@ where
|
|||
Ok(Message::IntersectNotFound(tip))
|
||||
}
|
||||
7 => Ok(Message::Done),
|
||||
x => Err(Box::new(MachineError::BadLabel(x))),
|
||||
x => Err(Box::new(CodecError::BadLabel(x))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An abstraction of a component in charge of block persistence
|
||||
pub trait Storage<C> {
|
||||
fn save_block(&self, content: &C) -> Result<(), Box<dyn std::error::Error>>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Consumer<C> {
|
||||
pub struct NoopStorage {}
|
||||
|
||||
impl<C> Storage<C> for NoopStorage
|
||||
where
|
||||
C: Debug,
|
||||
{
|
||||
fn save_block(&self, content: &C) -> Result<(), Box<dyn std::error::Error>> {
|
||||
log::warn!("asked to save block {:?}", content);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Consumer<C, S>
|
||||
where
|
||||
S: Storage<C>,
|
||||
{
|
||||
pub state: State,
|
||||
pub known_points: Vec<Point>,
|
||||
pub cursor: Option<Point>,
|
||||
pub tip: Option<Tip>,
|
||||
|
||||
storage: S,
|
||||
|
||||
// as recommended here: https://doc.rust-lang.org/error-index.html#E0207
|
||||
_phantom: Option<C>,
|
||||
}
|
||||
|
||||
impl<C> Consumer<C>
|
||||
impl<C, S> Consumer<C, S>
|
||||
where
|
||||
C: EncodePayload + DecodePayload + Debug,
|
||||
S: Storage<C>,
|
||||
{
|
||||
pub fn initial(known_points: Vec<Point>) -> Self {
|
||||
pub fn initial(known_points: Vec<Point>, storage: S) -> Self {
|
||||
Self {
|
||||
state: State::Idle,
|
||||
cursor: None,
|
||||
tip: None,
|
||||
known_points,
|
||||
storage,
|
||||
|
||||
_phantom: None,
|
||||
}
|
||||
}
|
||||
|
|
@ -294,9 +320,12 @@ where
|
|||
debug!("rolling forward");
|
||||
|
||||
if log_enabled!(log::Level::Trace) {
|
||||
trace!("header: {:?}", content);
|
||||
trace!("content: {:?}", content);
|
||||
}
|
||||
|
||||
debug!("saving block");
|
||||
self.storage.save_block(&content)?;
|
||||
|
||||
Ok(Self {
|
||||
tip: Some(tip),
|
||||
state: State::Idle,
|
||||
|
|
@ -314,11 +343,21 @@ where
|
|||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn on_await_reply(self) -> Transition<Self> {
|
||||
debug!("reached tip, await reply");
|
||||
|
||||
Ok(Self {
|
||||
state: State::MustReply,
|
||||
..self
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Agent for Consumer<C>
|
||||
impl<C, S> Agent for Consumer<C, S>
|
||||
where
|
||||
C: EncodePayload + DecodePayload + Debug,
|
||||
C: EncodePayload + DecodePayload + Debug + 'static,
|
||||
S: Storage<C>,
|
||||
{
|
||||
type Message = Message<C>;
|
||||
|
||||
|
|
@ -354,6 +393,7 @@ where
|
|||
(State::CanAwait, Message::RollBackward(point, tip)) => {
|
||||
self.on_roll_backward(point, tip)
|
||||
}
|
||||
(State::CanAwait, Message::AwaitReply) => self.on_await_reply(),
|
||||
(State::MustReply, Message::RollForward(header, tip)) => {
|
||||
self.on_roll_forward(header, tip)
|
||||
}
|
||||
|
|
@ -364,11 +404,11 @@ where
|
|||
self.on_intersect_found(point, tip)
|
||||
}
|
||||
(State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip),
|
||||
_ => Err(Box::new(MachineError::InvalidMsgForState)),
|
||||
(_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type NodeConsumer = Consumer<WrappedHeader>;
|
||||
pub type NodeConsumer<S> = Consumer<WrappedHeader, S>;
|
||||
|
||||
pub type ClientConsumer = Consumer<BlockBody>;
|
||||
pub type ClientConsumer<S> = Consumer<BlockBody, S>;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use itertools::Itertools;
|
||||
use pallas_machines::{DecodePayload, EncodePayload, MachineError, PayloadEncoder};
|
||||
use pallas_machines::{CodecError, DecodePayload, EncodePayload, PayloadEncoder};
|
||||
use std::{collections::HashMap, fmt::Debug};
|
||||
|
||||
pub const TESTNET_MAGIC: u64 = 1097911063;
|
||||
|
|
@ -97,7 +97,7 @@ impl DecodePayload for RefuseReason {
|
|||
|
||||
Ok(RefuseReason::Refused(version, msg.to_string()))
|
||||
}
|
||||
x => Err(Box::new(MachineError::BadLabel(x))),
|
||||
x => Err(Box::new(CodecError::BadLabel(x))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,7 @@
|
|||
use core::panic;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use pallas_machines::{
|
||||
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
|
||||
PayloadEncoder,
|
||||
};
|
||||
use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder};
|
||||
|
||||
use crate::common::{NetworkMagic, RefuseReason, VersionNumber};
|
||||
|
||||
|
|
@ -104,7 +101,7 @@ impl DecodePayload for Message {
|
|||
let reason = RefuseReason::decode_payload(d)?;
|
||||
Ok(Message::Refuse(reason))
|
||||
}
|
||||
x => Err(Box::new(MachineError::BadLabel(x))),
|
||||
x => Err(Box::new(CodecError::BadLabel(x))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,7 @@
|
|||
use core::panic;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use pallas_machines::{
|
||||
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
|
||||
PayloadEncoder,
|
||||
};
|
||||
use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder};
|
||||
|
||||
use crate::common::{RefuseReason, VersionNumber};
|
||||
|
||||
|
|
@ -123,7 +120,7 @@ impl DecodePayload for Message {
|
|||
let reason = RefuseReason::decode_payload(d)?;
|
||||
Ok(Message::Refuse(reason))
|
||||
}
|
||||
x => Err(Box::new(MachineError::BadLabel(x))),
|
||||
x => Err(Box::new(CodecError::BadLabel(x))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,34 +1,64 @@
|
|||
use log::{debug, trace, warn};
|
||||
use minicbor::{Decoder, Encode, Encoder};
|
||||
use minicbor::{Decoder, Encoder};
|
||||
use pallas_multiplexer::Payload;
|
||||
use std::borrow::Borrow;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MachineError {
|
||||
BadLabel(u16),
|
||||
UnexpectedCbor(&'static str),
|
||||
InvalidMsgForState,
|
||||
pub enum MachineError<State, Msg>
|
||||
where
|
||||
State: Debug,
|
||||
Msg: Debug,
|
||||
{
|
||||
InvalidMsgForState(State, Msg),
|
||||
}
|
||||
|
||||
impl Display for MachineError {
|
||||
impl<S, M> Display for MachineError<S, M>
|
||||
where
|
||||
S: Debug,
|
||||
M: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
MachineError::BadLabel(label) => {
|
||||
write!(f, "unknown message label: {}", label)
|
||||
}
|
||||
MachineError::UnexpectedCbor(msg) => {
|
||||
write!(f, "unexpected cbor: {}", msg)
|
||||
}
|
||||
MachineError::InvalidMsgForState => {
|
||||
write!(f, "received invalid message for current state")
|
||||
MachineError::InvalidMsgForState(msg, state) => {
|
||||
write!(
|
||||
f,
|
||||
"received invalid message ({:?}) for current state ({:?})",
|
||||
msg, state
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for MachineError {}
|
||||
impl<S, M> std::error::Error for MachineError<S, M>
|
||||
where
|
||||
S: Debug,
|
||||
M: Debug,
|
||||
{
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum CodecError {
|
||||
BadLabel(u16),
|
||||
UnexpectedCbor(&'static str),
|
||||
}
|
||||
|
||||
impl std::error::Error for CodecError {}
|
||||
|
||||
impl Display for CodecError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
CodecError::BadLabel(label) => {
|
||||
write!(f, "unknown message label: {}", label)
|
||||
}
|
||||
CodecError::UnexpectedCbor(msg) => {
|
||||
write!(f, "unexpected cbor: {}", msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type PayloadEncoder<'a> = Encoder<&'a mut Vec<u8>>;
|
||||
|
||||
|
|
@ -46,7 +76,6 @@ pub fn to_payload(data: &dyn EncodePayload) -> Result<Payload, Box<dyn std::erro
|
|||
Ok(payload)
|
||||
}
|
||||
|
||||
|
||||
impl<D> EncodePayload for Vec<D>
|
||||
where
|
||||
D: EncodePayload,
|
||||
|
|
@ -67,7 +96,7 @@ where
|
|||
D: DecodePayload,
|
||||
{
|
||||
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let len = d.array()?.ok_or(MachineError::UnexpectedCbor(
|
||||
let len = d.array()?.ok_or(CodecError::UnexpectedCbor(
|
||||
"expecting definite-length array",
|
||||
))? as usize;
|
||||
|
||||
|
|
@ -78,7 +107,6 @@ where
|
|||
}
|
||||
|
||||
Ok(output)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3,10 +3,7 @@ use std::fmt::Debug;
|
|||
use itertools::Itertools;
|
||||
use log::debug;
|
||||
|
||||
use pallas_machines::{
|
||||
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
|
||||
PayloadEncoder, Transition,
|
||||
};
|
||||
use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, PayloadEncoder, Transition};
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub enum State {
|
||||
|
|
@ -134,7 +131,7 @@ impl DecodePayload for Message {
|
|||
todo!()
|
||||
}
|
||||
4 => Ok(Message::Done),
|
||||
x => Err(Box::new(MachineError::BadLabel(x))),
|
||||
x => Err(Box::new(CodecError::BadLabel(x))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -294,7 +291,7 @@ impl Agent for NaiveProvider {
|
|||
..self
|
||||
}),
|
||||
(State::Idle, Message::RequestTxs(ids)) => self.on_txs_request(ids),
|
||||
_ => Err(Box::new(MachineError::InvalidMsgForState)),
|
||||
(_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue