Add local state query mini-protocol naive implementation
This commit is contained in:
parent
df41d7cbd8
commit
b07a1fa7e6
13 changed files with 617 additions and 97 deletions
|
|
@ -7,6 +7,7 @@ members = [
|
|||
"pallas-blockfetch",
|
||||
"pallas-chainsync",
|
||||
"pallas-txsubmission",
|
||||
"pallas-localstate",
|
||||
"pallas-alonzo",
|
||||
"pallas",
|
||||
]
|
||||
]
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ As already explained, _Pallas_ aims at being an expanding set of components. The
|
|||
| [pallas-handshake](/pallas-handshake) | Implementation of the Ouroboros network handshake mini-protocol |
|
||||
| [pallas-blockfetch](/pallas-blockfetch) | Implementation of the Ouroboros network blockfetch mini-protocol |
|
||||
| [pallas-chainsync](/pallas-chainsync) | Implementation of the Ouroboros network chainsync mini-protocol |
|
||||
| [pallas-localstate](/pallas-localstate) | Implementation of the Ouroboros network local state query mini-protocol |
|
||||
| [pallas-txsubmission](/pallas-txsubmission) | Implementation of the Ouroboros network txsubmission mini-protocol |
|
||||
|
||||
### Ouroboros Consensus
|
||||
|
|
|
|||
|
|
@ -2,8 +2,8 @@
|
|||
//!
|
||||
//! Handcrafted, idiomatic rust artifacts based on based on the [Alonzo CDDL](https://github.com/input-output-hk/cardano-ledger/blob/master/eras/alonzo/test-suite/cddl-files/alonzo.cddl) file in IOHK repo.
|
||||
|
||||
use log::{log_enabled, warn};
|
||||
use minicbor::{bytes::ByteVec, data::Tag, Decode, Encode};
|
||||
use log::warn;
|
||||
use minicbor::{bytes::ByteVec, data::Tag};
|
||||
use minicbor_derive::{Decode, Encode};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ const PROTOCOL_V6: u64 = 32774;
|
|||
const PROTOCOL_V7: u64 = 32775;
|
||||
const PROTOCOL_V8: u64 = 32776;
|
||||
const PROTOCOL_V9: u64 = 32777;
|
||||
// const PROTOCOL_V10: u64 = 32778;
|
||||
const PROTOCOL_V10: u64 = 32778;
|
||||
|
||||
impl VersionTable {
|
||||
pub fn v1_and_above(network_magic: u64) -> VersionTable {
|
||||
|
|
@ -30,6 +30,17 @@ impl VersionTable {
|
|||
(PROTOCOL_V7, VersionData(network_magic)),
|
||||
(PROTOCOL_V8, VersionData(network_magic)),
|
||||
(PROTOCOL_V9, VersionData(network_magic)),
|
||||
(PROTOCOL_V10, VersionData(network_magic)),
|
||||
]
|
||||
.into_iter()
|
||||
.collect::<HashMap<u64, VersionData>>();
|
||||
|
||||
VersionTable { values }
|
||||
}
|
||||
|
||||
pub fn only_v10(network_magic: u64) -> VersionTable {
|
||||
let values = vec![
|
||||
(PROTOCOL_V10, VersionData(network_magic)),
|
||||
]
|
||||
.into_iter()
|
||||
.collect::<HashMap<u64, VersionData>>();
|
||||
|
|
|
|||
2
pallas-localstate/.gitignore
vendored
Normal file
2
pallas-localstate/.gitignore
vendored
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
/target
|
||||
Cargo.lock
|
||||
25
pallas-localstate/Cargo.toml
Normal file
25
pallas-localstate/Cargo.toml
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
[package]
|
||||
name = "pallas-localstate"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
repository = "https://github.com/txpipe/pallas"
|
||||
license = "Apache-2.0"
|
||||
authors = [
|
||||
"Santiago Carmuega <santiago@carmuega.me>"
|
||||
]
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
pallas-multiplexer = { path = "../pallas-multiplexer/" }
|
||||
pallas-machines = { path = "../pallas-machines/" }
|
||||
minicbor = { version="0.11.4", features=["half"] }
|
||||
minicbor-io = "0.6.0"
|
||||
log = "0.4.14"
|
||||
hex = "0.4.3"
|
||||
|
||||
[dev-dependencies]
|
||||
net2 = "0.2.37"
|
||||
env_logger = "0.9.0"
|
||||
pallas-handshake = { path = "../pallas-handshake/" }
|
||||
pallas-txsubmission = { path = "../pallas-txsubmission/" }
|
||||
99
pallas-localstate/examples/chainpoint.rs
Normal file
99
pallas-localstate/examples/chainpoint.rs
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
use minicbor::data::Cbor;
|
||||
use pallas_localstate::{OneShotClient, Point, Query};
|
||||
use pallas_handshake::n2c::{Client, VersionTable};
|
||||
use pallas_handshake::{MAINNET_MAGIC};
|
||||
use pallas_machines::{DecodePayload, EncodePayload, run_agent};
|
||||
use pallas_multiplexer::Multiplexer;
|
||||
use std::net::TcpStream;
|
||||
use std::os::unix::net::UnixStream;
|
||||
use net2::*;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct BlockQuery {}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum Request {
|
||||
BlockQuery(BlockQuery),
|
||||
GetSystemStart,
|
||||
GetChainBlockNo,
|
||||
GetChainPoint,
|
||||
}
|
||||
|
||||
|
||||
impl EncodePayload for Request {
|
||||
fn encode_payload(&self, e: &mut pallas_machines::PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
|
||||
match self {
|
||||
Request::BlockQuery(block_query) => {
|
||||
e.u16(0)?;
|
||||
e.array(0)?;
|
||||
Ok(())
|
||||
}
|
||||
Request::GetSystemStart => {
|
||||
e.u16(1)?;
|
||||
Ok(())
|
||||
}
|
||||
Request::GetChainBlockNo => {
|
||||
e.u16(2)?;
|
||||
Ok(())
|
||||
}
|
||||
Request::GetChainPoint => {
|
||||
e.u16(3)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DecodePayload for Request {
|
||||
fn decode_payload(d: &mut pallas_machines::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum Response {
|
||||
Generic(Vec<u8>),
|
||||
}
|
||||
|
||||
impl EncodePayload for Response {
|
||||
fn encode_payload(&self, e: &mut pallas_machines::PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl DecodePayload for Response {
|
||||
fn decode_payload(d: &mut pallas_machines::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let cbor: Cbor = d.decode()?;
|
||||
let slice = cbor.as_ref();
|
||||
let vec = slice.to_vec();
|
||||
Ok(Response::Generic(vec))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ShelleyQuery {}
|
||||
|
||||
impl Query for ShelleyQuery {
|
||||
type Request = Request;
|
||||
type Response = Response;
|
||||
}
|
||||
|
||||
fn main() {
|
||||
env_logger::init();
|
||||
|
||||
// we connect to the unix socket of the local node. Make sure you have the right
|
||||
// path for your environment
|
||||
let bearer = UnixStream::connect("/tmp/node.socket").unwrap();
|
||||
|
||||
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 7]).unwrap();
|
||||
|
||||
let (rx, tx) = muxer.use_channel(0);
|
||||
let versions = VersionTable::only_v10(MAINNET_MAGIC);
|
||||
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
|
||||
println!("last hanshake state: {:?}", last);
|
||||
|
||||
let (cs_rx, cs_tx) = muxer.use_channel(7);
|
||||
let cs = OneShotClient::<ShelleyQuery>::initial(None, Request::GetChainPoint);
|
||||
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();
|
||||
println!("{:?}", cs);
|
||||
}
|
||||
134
pallas-localstate/src/codec.rs
Normal file
134
pallas-localstate/src/codec.rs
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
use super::*;
|
||||
use pallas_machines::*;
|
||||
|
||||
impl EncodePayload for Point {
|
||||
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
|
||||
e.array(2)?.u64(self.0)?.bytes(&self.1)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DecodePayload for Point {
|
||||
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
d.array()?;
|
||||
let slot = d.u64()?;
|
||||
let hash = d.bytes()?;
|
||||
|
||||
Ok(Point(slot, Vec::from(hash)))
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodePayload for AcquireFailure {
|
||||
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let code = match self {
|
||||
AcquireFailure::PointTooOld => 0,
|
||||
AcquireFailure::PointNotInChain => 1,
|
||||
};
|
||||
|
||||
e.u16(code)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DecodePayload for AcquireFailure {
|
||||
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let code = d.u16()?;
|
||||
|
||||
match code {
|
||||
0 => Ok(AcquireFailure::PointTooOld),
|
||||
1 => Ok(AcquireFailure::PointNotInChain),
|
||||
_ => Err(Box::new(CodecError::UnexpectedCbor("can't infer acquire failure from variant id"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Q: Query> EncodePayload for Message<Q> {
|
||||
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
|
||||
match self {
|
||||
Message::Acquire(Some(point)) => {
|
||||
e.array(2)?.u16(0)?;
|
||||
e.encode_payload(point)?;
|
||||
Ok(())
|
||||
}
|
||||
Message::Acquire(None) => {
|
||||
e.array(1)?.u16(8)?;
|
||||
Ok(())
|
||||
}
|
||||
Message::Acquired => {
|
||||
e.array(1)?.u16(1)?;
|
||||
Ok(())
|
||||
}
|
||||
Message::Failure(failure) => {
|
||||
e.array(2)?.u16(2)?;
|
||||
e.encode_payload(failure)?;
|
||||
Ok(())
|
||||
}
|
||||
Message::Query(query) => {
|
||||
e.array(2)?.u16(3)?;
|
||||
e.array(1)?;
|
||||
e.encode_payload(query)?;
|
||||
Ok(())
|
||||
}
|
||||
Message::Result(result) => {
|
||||
e.array(2)?.u16(4)?;
|
||||
e.array(1)?;
|
||||
e.encode_payload(result)?;
|
||||
Ok(())
|
||||
}
|
||||
Message::ReAcquire(Some(point)) => {
|
||||
e.array(2)?.u16(6)?;
|
||||
e.encode_payload(point)?;
|
||||
Ok(())
|
||||
}
|
||||
Message::ReAcquire(None) => {
|
||||
e.array(1)?.u16(9)?;
|
||||
Ok(())
|
||||
}
|
||||
Message::Release => {
|
||||
e.array(1)?.u16(5)?;
|
||||
Ok(())
|
||||
}
|
||||
Message::Done => {
|
||||
e.array(1)?.u16(7)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Q: Query> DecodePayload for Message<Q> {
|
||||
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
d.array()?;
|
||||
let label = d.u16()?;
|
||||
|
||||
match label {
|
||||
0 => {
|
||||
let point = d.decode_payload()?;
|
||||
Ok(Message::Acquire(Some(point)))
|
||||
}
|
||||
8 => Ok(Message::Acquire(None)),
|
||||
1 => Ok(Message::Acquired),
|
||||
2 => {
|
||||
let failure = d.decode_payload()?;
|
||||
Ok(Message::Failure(failure))
|
||||
}
|
||||
3 => {
|
||||
let query = d.decode_payload()?;
|
||||
Ok(Message::Query(query))
|
||||
}
|
||||
4 => {
|
||||
let response = d.decode_payload()?;
|
||||
Ok(Message::Result(response))
|
||||
}
|
||||
5 => Ok(Message::Release),
|
||||
6 => {
|
||||
let point = d.decode_payload()?;
|
||||
Ok(Message::ReAcquire(point))
|
||||
}
|
||||
9 => Ok(Message::ReAcquire(None)),
|
||||
7 => Ok(Message::Done),
|
||||
x => Err(Box::new(CodecError::BadLabel(x))),
|
||||
}
|
||||
}
|
||||
}
|
||||
181
pallas-localstate/src/lib.rs
Normal file
181
pallas-localstate/src/lib.rs
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
mod codec;
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
||||
use log::debug;
|
||||
|
||||
use pallas_machines::{
|
||||
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Point(pub u64, pub Vec<u8>);
|
||||
|
||||
impl Debug for Point {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("Point")
|
||||
.field(&self.0)
|
||||
.field(&hex::encode(&self.1))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub enum State {
|
||||
Idle,
|
||||
Acquiring,
|
||||
Acquired,
|
||||
Querying,
|
||||
Done,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AcquireFailure {
|
||||
PointTooOld,
|
||||
PointNotInChain,
|
||||
}
|
||||
pub trait Query: Debug {
|
||||
type Request: EncodePayload + DecodePayload + Clone + Debug;
|
||||
type Response: EncodePayload + DecodePayload + Clone + Debug;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Message<Q: Query> {
|
||||
Acquire(Option<Point>),
|
||||
Failure(AcquireFailure),
|
||||
Acquired,
|
||||
Query(Q::Request),
|
||||
Result(Q::Response),
|
||||
ReAcquire(Option<Point>),
|
||||
Release,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub type Output<QR> = Result<QR, AcquireFailure>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OneShotClient<Q: Query> {
|
||||
pub state: State,
|
||||
pub check_point: Option<Point>,
|
||||
pub request: Q::Request,
|
||||
pub output: Option<Output<Q::Response>>,
|
||||
}
|
||||
|
||||
impl<Q: Query> OneShotClient<Q> {
|
||||
pub fn initial(check_point: Option<Point>, request: Q::Request) -> Self {
|
||||
Self {
|
||||
state: State::Idle,
|
||||
output: None,
|
||||
check_point,
|
||||
request,
|
||||
}
|
||||
}
|
||||
|
||||
fn send_acquire(self, tx: &impl MachineOutput) -> Transition<Self> {
|
||||
let msg = Message::<Q>::Acquire(self.check_point.clone());
|
||||
|
||||
tx.send_msg(&msg)?;
|
||||
|
||||
Ok(Self {
|
||||
state: State::Acquiring,
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn send_query(self, tx: &impl MachineOutput) -> Transition<Self> {
|
||||
let msg = Message::<Q>::Query(self.request.clone());
|
||||
|
||||
tx.send_msg(&msg)?;
|
||||
|
||||
Ok(Self {
|
||||
state: State::Querying,
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn send_release(self, tx: &impl MachineOutput) -> Transition<Self> {
|
||||
let msg = Message::<Q>::Release;
|
||||
|
||||
tx.send_msg(&msg)?;
|
||||
|
||||
Ok(Self {
|
||||
state: State::Idle,
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn on_acquired(self) -> Transition<Self> {
|
||||
debug!("acquired check point for chain state");
|
||||
|
||||
Ok(Self {
|
||||
state: State::Acquired,
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn on_result(self, response: Q::Response) -> Transition<Self> {
|
||||
debug!("query result received: {:?}", response);
|
||||
|
||||
Ok(Self {
|
||||
state: State::Acquired,
|
||||
output: Some(Ok(response)),
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn on_failure(self, failure: AcquireFailure) -> Transition<Self> {
|
||||
debug!("acquire failure: {:?}", failure);
|
||||
|
||||
Ok(Self {
|
||||
state: State::Idle,
|
||||
output: Some(Err(failure)),
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn done(self) -> Transition<Self> {
|
||||
Ok(Self {
|
||||
state: State::Done,
|
||||
..self
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<Q: Query + 'static> Agent for OneShotClient<Q> {
|
||||
type Message = Message<Q>;
|
||||
|
||||
fn is_done(&self) -> bool {
|
||||
self.state == State::Done
|
||||
}
|
||||
|
||||
fn has_agency(&self) -> bool {
|
||||
match self.state {
|
||||
State::Idle => true,
|
||||
State::Acquired => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
|
||||
match (&self.state, &self.output) {
|
||||
// if we're idle and without a result, assume start of flow
|
||||
(State::Idle, None) => self.send_acquire(tx),
|
||||
// if we're idle and with a result, assume end of flow
|
||||
(State::Idle, Some(_)) => self.done(),
|
||||
// if we don't have an output, assume start of query
|
||||
(State::Acquired, None) => self.send_query(tx),
|
||||
// if we have an output but still acquired, release the server
|
||||
(State::Acquired, Some(_)) => self.send_release(tx),
|
||||
_ => panic!("I don't have agency, don't know what to do"),
|
||||
}
|
||||
}
|
||||
|
||||
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
|
||||
match (&self.state, msg) {
|
||||
(State::Acquiring, Message::Acquired) => self.on_acquired(),
|
||||
(State::Acquiring, Message::Failure(failure)) => self.on_failure(failure),
|
||||
(State::Querying, Message::Result(result)) => self.on_result(result),
|
||||
(_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,10 +1,13 @@
|
|||
mod payload;
|
||||
|
||||
use log::{debug, trace, warn};
|
||||
use minicbor::{Decoder, Encoder};
|
||||
use pallas_multiplexer::Payload;
|
||||
use std::borrow::Borrow;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
|
||||
pub use payload::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MachineError<State, Msg>
|
||||
where
|
||||
|
|
@ -60,56 +63,6 @@ impl Display for CodecError {
|
|||
}
|
||||
}
|
||||
|
||||
pub type PayloadEncoder<'a> = Encoder<&'a mut Vec<u8>>;
|
||||
|
||||
pub type PayloadDecoder<'a> = Decoder<'a>;
|
||||
|
||||
pub trait EncodePayload {
|
||||
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>>;
|
||||
}
|
||||
|
||||
pub fn to_payload(data: &dyn EncodePayload) -> Result<Payload, Box<dyn std::error::Error>> {
|
||||
let mut payload = Vec::new();
|
||||
let mut encoder = minicbor::encode::Encoder::new(&mut payload);
|
||||
data.encode_payload(&mut encoder)?;
|
||||
|
||||
Ok(payload)
|
||||
}
|
||||
|
||||
impl<D> EncodePayload for Vec<D>
|
||||
where
|
||||
D: EncodePayload,
|
||||
{
|
||||
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
|
||||
e.array(self.len() as u64)?;
|
||||
|
||||
for item in self {
|
||||
item.encode_payload(e)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<D> DecodePayload for Vec<D>
|
||||
where
|
||||
D: DecodePayload,
|
||||
{
|
||||
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let len = d.array()?.ok_or(CodecError::UnexpectedCbor(
|
||||
"expecting definite-length array",
|
||||
))? as usize;
|
||||
|
||||
let mut output = Vec::<D>::with_capacity(len);
|
||||
|
||||
for i in 0..(len - 1) {
|
||||
output[i] = D::decode_payload(d)?;
|
||||
}
|
||||
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait MachineOutput {
|
||||
fn send_msg(&self, data: &impl EncodePayload) -> Result<(), Box<dyn std::error::Error>>;
|
||||
}
|
||||
|
|
@ -123,48 +76,6 @@ impl MachineOutput for Sender<Payload> {
|
|||
}
|
||||
}
|
||||
|
||||
pub trait DecodePayload: Sized {
|
||||
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>>;
|
||||
}
|
||||
|
||||
pub struct PayloadDeconstructor {
|
||||
rx: Receiver<Payload>,
|
||||
remaining: Vec<u8>,
|
||||
}
|
||||
|
||||
impl PayloadDeconstructor {
|
||||
pub fn consume_next_message<T: DecodePayload>(
|
||||
&mut self,
|
||||
) -> Result<T, Box<dyn std::error::Error>> {
|
||||
if self.remaining.len() == 0 {
|
||||
debug!("no remaining payload, fetching next segment");
|
||||
let payload = self.rx.recv()?;
|
||||
self.remaining.extend(payload);
|
||||
}
|
||||
|
||||
let mut decoder = minicbor::Decoder::new(&self.remaining);
|
||||
|
||||
match T::decode_payload(&mut decoder) {
|
||||
Ok(t) => {
|
||||
let new_pos = decoder.position();
|
||||
self.remaining.drain(0..new_pos);
|
||||
debug!("consumed {} from payload buffer", new_pos);
|
||||
Ok(t)
|
||||
}
|
||||
Err(err) => {
|
||||
//TODO: we need to filter this only for correct errors
|
||||
warn!("{:?}", err);
|
||||
|
||||
debug!("payload incomplete, fetching next segment");
|
||||
let payload = self.rx.recv()?;
|
||||
self.remaining.extend(payload);
|
||||
|
||||
self.consume_next_message::<T>()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type Transition<T> = Result<T, Box<dyn std::error::Error>>;
|
||||
|
||||
pub trait Agent: Sized {
|
||||
|
|
|
|||
151
pallas-machines/src/payload.rs
Normal file
151
pallas-machines/src/payload.rs
Normal file
|
|
@ -0,0 +1,151 @@
|
|||
use super::*;
|
||||
|
||||
use log::{debug, warn};
|
||||
use minicbor::{Decoder, Encoder};
|
||||
use std::{ops::{Deref, DerefMut}, sync::mpsc::Receiver};
|
||||
use pallas_multiplexer::Payload;
|
||||
|
||||
pub struct PayloadEncoder<'a>(Encoder<&'a mut Vec<u8>>);
|
||||
|
||||
impl<'a> Deref for PayloadEncoder<'a> {
|
||||
type Target = Encoder<&'a mut Vec<u8>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DerefMut for PayloadEncoder<'a> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> PayloadEncoder<'a> {
|
||||
pub fn encode_payload<T: EncodePayload>(&mut self, t: &T)->Result<(), Box<dyn std::error::Error>> {
|
||||
t.encode_payload(self)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PayloadDecoder<'a>(Decoder<'a>);
|
||||
|
||||
impl<'a> Deref for PayloadDecoder<'a> {
|
||||
type Target = Decoder<'a>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DerefMut for PayloadDecoder<'a> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> PayloadDecoder<'a> {
|
||||
pub fn decode_payload<T: DecodePayload>(&mut self) -> Result<T, Box<dyn std::error::Error>> {
|
||||
T::decode_payload(self)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait EncodePayload {
|
||||
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>>;
|
||||
}
|
||||
|
||||
pub fn to_payload(data: &dyn EncodePayload) -> Result<Payload, Box<dyn std::error::Error>> {
|
||||
let mut payload = Vec::new();
|
||||
let mut encoder = PayloadEncoder(minicbor::encode::Encoder::new(&mut payload));
|
||||
data.encode_payload(&mut encoder)?;
|
||||
|
||||
Ok(payload)
|
||||
}
|
||||
|
||||
impl<D> EncodePayload for Vec<D>
|
||||
where
|
||||
D: EncodePayload,
|
||||
{
|
||||
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
|
||||
e.array(self.len() as u64)?;
|
||||
|
||||
for item in self {
|
||||
item.encode_payload(e)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<D> DecodePayload for Vec<D>
|
||||
where
|
||||
D: DecodePayload,
|
||||
{
|
||||
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let len = d.array()?.ok_or(CodecError::UnexpectedCbor(
|
||||
"expecting definite-length array",
|
||||
))? as usize;
|
||||
|
||||
let mut output = Vec::<D>::with_capacity(len);
|
||||
|
||||
for i in 0..(len - 1) {
|
||||
output[i] = D::decode_payload(d)?;
|
||||
}
|
||||
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub trait DecodePayload: Sized {
|
||||
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>>;
|
||||
}
|
||||
|
||||
impl<T: DecodePayload> DecodePayload for Option<T> {
|
||||
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
match d.datatype()? {
|
||||
minicbor::data::Type::Undefined => Ok(None),
|
||||
_ => {
|
||||
let value = d.decode_payload()?;
|
||||
Ok(Some(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PayloadDeconstructor {
|
||||
pub(crate) rx: Receiver<Payload>,
|
||||
pub(crate) remaining: Vec<u8>,
|
||||
}
|
||||
|
||||
impl PayloadDeconstructor {
|
||||
pub fn consume_next_message<T: DecodePayload>(
|
||||
&mut self,
|
||||
) -> Result<T, Box<dyn std::error::Error>> {
|
||||
if self.remaining.len() == 0 {
|
||||
debug!("no remaining payload, fetching next segment");
|
||||
let payload = self.rx.recv()?;
|
||||
self.remaining.extend(payload);
|
||||
}
|
||||
|
||||
let mut decoder = PayloadDecoder(minicbor::Decoder::new(&self.remaining));
|
||||
|
||||
match T::decode_payload(&mut decoder) {
|
||||
Ok(t) => {
|
||||
let new_pos = decoder.position();
|
||||
self.remaining.drain(0..new_pos);
|
||||
debug!("consumed {} from payload buffer", new_pos);
|
||||
Ok(t)
|
||||
}
|
||||
Err(err) => {
|
||||
//TODO: we need to filter this only for correct errors
|
||||
warn!("{:?}", err);
|
||||
|
||||
debug!("payload incomplete, fetching next segment");
|
||||
let payload = self.rx.recv()?;
|
||||
self.remaining.extend(payload);
|
||||
|
||||
self.consume_next_message::<T>()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -16,5 +16,6 @@ pallas-machines = { path = "../pallas-machines/" }
|
|||
pallas-handshake = { path = "../pallas-handshake/" }
|
||||
pallas-chainsync = { path = "../pallas-chainsync/" }
|
||||
pallas-blockfetch = { path = "../pallas-blockfetch/" }
|
||||
pallas-localstate = { path = "../pallas-localstate/" }
|
||||
pallas-txsubmission = { path = "../pallas-txsubmission/" }
|
||||
pallas-alonzo = { path = "../pallas-alonzo/" }
|
||||
|
|
@ -16,3 +16,6 @@ pub use pallas_blockfetch as blockfetch;
|
|||
|
||||
#[doc(inline)]
|
||||
pub use pallas_txsubmission as txsubmission;
|
||||
|
||||
#[doc(inline)]
|
||||
pub use pallas_localstate as localstate;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue