chore: Apply code formatting
This commit is contained in:
parent
45a0889036
commit
44ba2247a5
4 changed files with 142 additions and 137 deletions
|
|
@ -54,10 +54,9 @@ impl VersionTable {
|
|||
(PROTOCOL_V10, VersionData(network_magic)),
|
||||
(PROTOCOL_V11, VersionData(network_magic)),
|
||||
(PROTOCOL_V12, VersionData(network_magic)),
|
||||
|
||||
]
|
||||
.into_iter()
|
||||
.collect::<HashMap<u64, VersionData>>();
|
||||
]
|
||||
.into_iter()
|
||||
.collect::<HashMap<u64, VersionData>>();
|
||||
|
||||
VersionTable { values }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,8 +5,8 @@ pub mod blockfetch;
|
|||
pub mod chainsync;
|
||||
pub mod handshake;
|
||||
pub mod localstate;
|
||||
pub mod txsubmission;
|
||||
pub mod txmonitor;
|
||||
pub mod txsubmission;
|
||||
|
||||
pub use common::*;
|
||||
pub use machines::*;
|
||||
|
|
|
|||
|
|
@ -1,77 +1,68 @@
|
|||
use super::{Message, MsgRequest, MsgResponse, MempoolSizeAndCapacity};
|
||||
use super::{MempoolSizeAndCapacity, Message, MsgRequest, MsgResponse};
|
||||
use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder};
|
||||
|
||||
impl Encode<()> for Message
|
||||
{
|
||||
|
||||
impl Encode<()> for Message {
|
||||
fn encode<W: encode::Write>(
|
||||
&self,
|
||||
e: &mut Encoder<W>,
|
||||
&self,
|
||||
e: &mut Encoder<W>,
|
||||
ctx: &mut (),
|
||||
) -> Result<(), encode::Error<W::Error>> {
|
||||
match self {
|
||||
Message::MsgDone => {
|
||||
e.array(1)?.u16(0)?;
|
||||
},
|
||||
}
|
||||
Message::MsgAcquire => {
|
||||
e.array(1)?.u16(1)?;
|
||||
|
||||
},
|
||||
}
|
||||
Message::MsgAcquired(slot) => {
|
||||
e.array(2)?.u16(2)?;
|
||||
e.encode(slot)?;
|
||||
},
|
||||
}
|
||||
Message::MsgQuery(query) => {
|
||||
query.encode(e, ctx)?;
|
||||
},
|
||||
}
|
||||
Message::MsgResponse(response) => {
|
||||
response.encode(e, ctx)?;
|
||||
}
|
||||
}
|
||||
log::debug!("encode message: {:?}",self);
|
||||
log::debug!("encode message: {:?}", self);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'b> Decode<'b,()> for Message {
|
||||
fn decode(d: &mut pallas_codec::minicbor::Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
|
||||
d.array()?;
|
||||
let label = d.u16()?;
|
||||
log::debug!("decode message: {:?}",label);
|
||||
match label {
|
||||
0 => {
|
||||
Ok(Message::MsgDone)
|
||||
},
|
||||
1 => {
|
||||
Ok(Message::MsgAcquire)
|
||||
},
|
||||
impl<'b> Decode<'b, ()> for Message {
|
||||
fn decode(
|
||||
d: &mut pallas_codec::minicbor::Decoder<'b>,
|
||||
_ctx: &mut (),
|
||||
) -> Result<Self, decode::Error> {
|
||||
d.array()?;
|
||||
let label = d.u16()?;
|
||||
log::debug!("decode message: {:?}", label);
|
||||
match label {
|
||||
0 => Ok(Message::MsgDone),
|
||||
1 => Ok(Message::MsgAcquire),
|
||||
2 => {
|
||||
let slot = d.decode()?;
|
||||
Ok(Message::MsgAcquired(slot))
|
||||
},
|
||||
3 => {
|
||||
Ok(Message::MsgQuery(MsgRequest::MsgRelease))
|
||||
},
|
||||
5 => {
|
||||
Ok(Message::MsgQuery(MsgRequest::MsgNextTx))
|
||||
},
|
||||
}
|
||||
3 => Ok(Message::MsgQuery(MsgRequest::MsgRelease)),
|
||||
5 => Ok(Message::MsgQuery(MsgRequest::MsgNextTx)),
|
||||
6 => {
|
||||
log::trace!("Decoding 6, 1. Array: {:?}",d);
|
||||
let de : Result<Option<u64>,pallas_codec::minicbor::decode::Error> = d.array();
|
||||
log::trace!("Decoding 6, 2. Array: {:?}",de);
|
||||
let tag : Result<u8,pallas_codec::minicbor::decode::Error> = d.u8();
|
||||
log::trace!("Decoding 6, 1. Array: {:?}", d);
|
||||
let de: Result<Option<u64>, pallas_codec::minicbor::decode::Error> = d.array();
|
||||
log::trace!("Decoding 6, 2. Array: {:?}", de);
|
||||
let tag: Result<u8, pallas_codec::minicbor::decode::Error> = d.u8();
|
||||
let mut tx = None;
|
||||
if let Ok(_) = tag {
|
||||
log::trace!("Decoding 6, Tag: {:?}",tag);
|
||||
log::trace!("Decoding 6, Tag: {:?}", tag);
|
||||
let det = d.tag();
|
||||
log::trace!("Decoding 6, Bytes: {:?}",det);
|
||||
log::trace!("Decoding 6, Bytes: {:?}", det);
|
||||
let cbor = d.bytes()?;
|
||||
tx = Some(hex::encode(cbor));
|
||||
log::trace!("Decoding 6, Tx: {:?}",tx);
|
||||
|
||||
log::trace!("Decoding 6, Tx: {:?}", tx);
|
||||
}
|
||||
Ok(Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx)))
|
||||
},
|
||||
}
|
||||
7 => {
|
||||
let txid = d.decode()?;
|
||||
Ok(Message::MsgQuery(MsgRequest::MsgHasTx(txid)))
|
||||
|
|
@ -80,67 +71,62 @@ fn decode(d: &mut pallas_codec::minicbor::Decoder<'b>, _ctx: &mut ()) -> Result<
|
|||
let has = d.decode()?;
|
||||
Ok(Message::MsgResponse(MsgResponse::MsgReplyHasTx(has)))
|
||||
}
|
||||
9 => {
|
||||
Ok(Message::MsgQuery(MsgRequest::MsgGetSizes))
|
||||
}
|
||||
9 => Ok(Message::MsgQuery(MsgRequest::MsgGetSizes)),
|
||||
10 => {
|
||||
d.array()?;
|
||||
let capacity = d.decode()?;
|
||||
let size_in_bytes = d.decode()?;
|
||||
let number_of_tx = d.decode()?;
|
||||
Ok(
|
||||
Message::MsgResponse(MsgResponse::MsgReplyGetSizes(MempoolSizeAndCapacity {
|
||||
capacity_in_bytes : capacity,
|
||||
size_in_bytes : size_in_bytes,
|
||||
number_of_txs : number_of_tx,
|
||||
}))
|
||||
)
|
||||
Ok(Message::MsgResponse(MsgResponse::MsgReplyGetSizes(
|
||||
MempoolSizeAndCapacity {
|
||||
capacity_in_bytes: capacity,
|
||||
size_in_bytes: size_in_bytes,
|
||||
number_of_txs: number_of_tx,
|
||||
},
|
||||
)))
|
||||
}
|
||||
_ => Err(decode::Error::message(
|
||||
"can't decode Message",
|
||||
))
|
||||
_ => Err(decode::Error::message("can't decode Message")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn nil() -> Option<Self> {
|
||||
fn nil() -> Option<Self> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode<()> for MsgRequest {
|
||||
fn encode<W: encode::Write>(
|
||||
&self,
|
||||
e: &mut Encoder<W>,
|
||||
&self,
|
||||
e: &mut Encoder<W>,
|
||||
_ctx: &mut (),
|
||||
) -> Result<(), encode::Error<W::Error>> {
|
||||
match self {
|
||||
MsgRequest::MsgAwaitAcquire => {
|
||||
e.array(1)?.u16(1)?;
|
||||
},
|
||||
}
|
||||
MsgRequest::MsgGetSizes => {
|
||||
e.array(1)?.u16(9)?;
|
||||
},
|
||||
}
|
||||
MsgRequest::MsgHasTx(tx) => {
|
||||
e.array(2)?.u16(7)?;
|
||||
e.array(2)?.u16(7)?;
|
||||
e.encode(tx)?;
|
||||
},
|
||||
}
|
||||
MsgRequest::MsgNextTx => {
|
||||
e.array(1)?.u16(5)?;
|
||||
},
|
||||
}
|
||||
MsgRequest::MsgRelease => {
|
||||
e.array(1)?.u16(3)?;
|
||||
},
|
||||
|
||||
e.array(1)?.u16(3)?;
|
||||
}
|
||||
}
|
||||
log::debug!("encode message: {:?}",self);
|
||||
log::debug!("encode message: {:?}", self);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode<()> for MsgResponse {
|
||||
fn encode<W: encode::Write>(
|
||||
&self,
|
||||
e: &mut Encoder<W>,
|
||||
&self,
|
||||
e: &mut Encoder<W>,
|
||||
_ctx: &mut (),
|
||||
) -> Result<(), encode::Error<W::Error>> {
|
||||
match self {
|
||||
|
|
@ -150,18 +136,18 @@ impl Encode<()> for MsgResponse {
|
|||
e.encode(sz.capacity_in_bytes)?;
|
||||
e.encode(sz.size_in_bytes)?;
|
||||
e.encode(sz.number_of_txs)?;
|
||||
},
|
||||
}
|
||||
MsgResponse::MsgReplyHasTx(tx) => {
|
||||
e.array(2)?.u16(8)?;
|
||||
e.encode(tx)?;
|
||||
},
|
||||
}
|
||||
MsgResponse::MsgReplyNextTx(None) => {
|
||||
e.array(1)?.u16(6)?;
|
||||
},
|
||||
}
|
||||
MsgResponse::MsgReplyNextTx(Some(tx)) => {
|
||||
e.array(2)?.u16(6)?;
|
||||
e.encode(tx.to_string())?;
|
||||
},
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
mod codec;
|
||||
|
||||
use std::{fmt::{Debug}};
|
||||
use pallas_codec::Fragment;
|
||||
use crate::machines::{Agent, MachineError, Transition};
|
||||
use pallas_codec::Fragment;
|
||||
use std::fmt::Debug;
|
||||
|
||||
type Slot = u64;
|
||||
type TxId = String;
|
||||
|
|
@ -26,9 +26,9 @@ pub enum State {
|
|||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct MempoolSizeAndCapacity {
|
||||
pub capacity_in_bytes : u32,
|
||||
pub size_in_bytes : u32,
|
||||
pub number_of_txs : u32,
|
||||
pub capacity_in_bytes: u32,
|
||||
pub size_in_bytes: u32,
|
||||
pub number_of_txs: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -40,12 +40,11 @@ pub enum Message {
|
|||
MsgDone,
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum MsgRequest {
|
||||
MsgAwaitAcquire,
|
||||
MsgNextTx,
|
||||
MsgHasTx(TxId),
|
||||
MsgHasTx(TxId),
|
||||
MsgGetSizes,
|
||||
MsgRelease,
|
||||
}
|
||||
|
|
@ -53,52 +52,50 @@ pub enum MsgRequest {
|
|||
pub enum MsgResponse {
|
||||
MsgReplyNextTx(Option<Tx>),
|
||||
MsgReplyHasTx(bool),
|
||||
MsgReplyGetSizes(MempoolSizeAndCapacity),
|
||||
MsgReplyGetSizes(MempoolSizeAndCapacity),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LocalTxMonitor {
|
||||
pub state : State,
|
||||
pub snapshot : Option<Slot>,
|
||||
pub request : Option<MsgRequest>,
|
||||
pub output : Option<MsgResponse>,
|
||||
pub state: State,
|
||||
pub snapshot: Option<Slot>,
|
||||
pub request: Option<MsgRequest>,
|
||||
pub output: Option<MsgResponse>,
|
||||
}
|
||||
|
||||
impl LocalTxMonitor
|
||||
where
|
||||
Message : Fragment,
|
||||
where
|
||||
Message: Fragment,
|
||||
{
|
||||
|
||||
pub fn initial(state : State) -> Self {
|
||||
pub fn initial(state: State) -> Self {
|
||||
Self {
|
||||
state : state,
|
||||
snapshot : None,
|
||||
request : None,
|
||||
output : None,
|
||||
state: state,
|
||||
snapshot: None,
|
||||
request: None,
|
||||
output: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn on_acquired(self, s: Slot) -> Transition<Self> {
|
||||
log::debug!("acquired Slot: '{:?}' ",s);
|
||||
log::debug!("acquired Slot: '{:?}' ", s);
|
||||
|
||||
Ok(Self {
|
||||
state : State::StAcquired,
|
||||
snapshot : Some(s),
|
||||
output : None,
|
||||
state: State::StAcquired,
|
||||
snapshot: Some(s),
|
||||
output: None,
|
||||
..self
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn on_reply_next_tx(self, tx: Option<Tx>) -> Transition<Self> {
|
||||
fn on_reply_next_tx(self, tx: Option<Tx>) -> Transition<Self> {
|
||||
log::debug!("Next Transaction: {:?}", tx);
|
||||
Ok(Self {
|
||||
output: Some(MsgResponse::MsgReplyNextTx(tx)),
|
||||
..self
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
fn on_reply_has_tx(self, arg: bool) -> Transition<Self> {
|
||||
|
||||
fn on_reply_has_tx(self, arg: bool) -> Transition<Self> {
|
||||
log::debug!("Mempool has transaction: {:?}", arg);
|
||||
Ok(Self {
|
||||
output: Some(MsgResponse::MsgReplyHasTx(arg)),
|
||||
|
|
@ -106,7 +103,7 @@ impl LocalTxMonitor
|
|||
})
|
||||
}
|
||||
|
||||
fn on_reply_get_size(self, msc: MempoolSizeAndCapacity) -> Transition<Self> {
|
||||
fn on_reply_get_size(self, msc: MempoolSizeAndCapacity) -> Transition<Self> {
|
||||
log::debug!("Mempool Status: {:?}", msc);
|
||||
|
||||
Ok(Self {
|
||||
|
|
@ -117,8 +114,9 @@ impl LocalTxMonitor
|
|||
}
|
||||
|
||||
impl Agent for LocalTxMonitor
|
||||
where
|
||||
Message: Fragment, {
|
||||
where
|
||||
Message: Fragment,
|
||||
{
|
||||
type Message = Message;
|
||||
type State = State;
|
||||
|
||||
|
|
@ -128,14 +126,18 @@ impl Agent for LocalTxMonitor
|
|||
}
|
||||
|
||||
fn is_done(&self) -> bool {
|
||||
|
||||
let done = self.state == State::StDone;
|
||||
log::debug!("is_done: {:?}",done);
|
||||
log::debug!("is_done: {:?}", done);
|
||||
done
|
||||
}
|
||||
|
||||
fn has_agency(&self) -> bool{
|
||||
log::trace!("Hase Agency: State: {:?}, Request: {:?}, Response: {:?}",self.state,self.request,self.output);
|
||||
fn has_agency(&self) -> bool {
|
||||
log::trace!(
|
||||
"Hase Agency: State: {:?}, Request: {:?}, Response: {:?}",
|
||||
self.state,
|
||||
self.request,
|
||||
self.output
|
||||
);
|
||||
match &self.state {
|
||||
State::StIdle => true,
|
||||
State::StAcquiring => false,
|
||||
|
|
@ -146,17 +148,28 @@ impl Agent for LocalTxMonitor
|
|||
}
|
||||
|
||||
fn build_next(&self) -> Self::Message {
|
||||
log::debug!("build next; State: {:?}, request: {:?}, output: {:?}",&self.state, &self.request, &self.output);
|
||||
log::debug!(
|
||||
"build next; State: {:?}, request: {:?}, output: {:?}",
|
||||
&self.state,
|
||||
&self.request,
|
||||
&self.output
|
||||
);
|
||||
match (&self.state, &self.request, &self.output) {
|
||||
(State::StIdle, None ,None) => Message::MsgAcquire,
|
||||
(State::StAcquired, None , None) => Message::MsgAcquire,
|
||||
(State::StAcquired, Some(MsgRequest::MsgAwaitAcquire), None) => Message::MsgAcquire,
|
||||
(State::StAcquired, Some(MsgRequest::MsgNextTx),None) => Message::MsgQuery(MsgRequest::MsgNextTx),
|
||||
(State::StAcquired, Some(MsgRequest::MsgHasTx(tx)),None) => Message::MsgQuery(MsgRequest::MsgHasTx(tx.clone())),
|
||||
(State::StAcquired, Some(MsgRequest::MsgGetSizes),None) => Message::MsgQuery(MsgRequest::MsgGetSizes),
|
||||
(State::StAcquired, None, Some(_)) => Message::MsgAcquire,
|
||||
(State::StAcquired, Some(req), Some(_)) => Message::MsgQuery(req.to_owned()),
|
||||
_ => panic!("I do not have agency, don't know what to do")
|
||||
(State::StIdle, None, None) => Message::MsgAcquire,
|
||||
(State::StAcquired, None, None) => Message::MsgAcquire,
|
||||
(State::StAcquired, Some(MsgRequest::MsgAwaitAcquire), None) => Message::MsgAcquire,
|
||||
(State::StAcquired, Some(MsgRequest::MsgNextTx), None) => {
|
||||
Message::MsgQuery(MsgRequest::MsgNextTx)
|
||||
}
|
||||
(State::StAcquired, Some(MsgRequest::MsgHasTx(tx)), None) => {
|
||||
Message::MsgQuery(MsgRequest::MsgHasTx(tx.clone()))
|
||||
}
|
||||
(State::StAcquired, Some(MsgRequest::MsgGetSizes), None) => {
|
||||
Message::MsgQuery(MsgRequest::MsgGetSizes)
|
||||
}
|
||||
(State::StAcquired, None, Some(_)) => Message::MsgAcquire,
|
||||
(State::StAcquired, Some(req), Some(_)) => Message::MsgQuery(req.to_owned()),
|
||||
_ => panic!("I do not have agency, don't know what to do"),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -168,13 +181,13 @@ impl Agent for LocalTxMonitor
|
|||
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
|
||||
log::debug!("apply outbound");
|
||||
match (self.state, msg) {
|
||||
|
||||
(State::StIdle, Message::MsgAcquire) => {
|
||||
log::debug!("apply outbound : MsgAcquire");
|
||||
Ok(Self {
|
||||
state: State::StAcquiring,
|
||||
..self
|
||||
})},
|
||||
state: State::StAcquiring,
|
||||
..self
|
||||
})
|
||||
}
|
||||
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgNextTx)) => Ok(Self {
|
||||
state: State::StBusy(StBusyKind::NextTx),
|
||||
..self
|
||||
|
|
@ -183,7 +196,7 @@ impl Agent for LocalTxMonitor
|
|||
state: State::StBusy(StBusyKind::HasTx),
|
||||
..self
|
||||
}),
|
||||
|
||||
|
||||
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgGetSizes)) => Ok(Self {
|
||||
state: State::StBusy(StBusyKind::GetSizes),
|
||||
..self
|
||||
|
|
@ -201,20 +214,27 @@ impl Agent for LocalTxMonitor
|
|||
..self
|
||||
}),
|
||||
|
||||
_ => panic!("PANIC! Cannot match outbound")
|
||||
_ => panic!("PANIC! Cannot match outbound"),
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
|
||||
log::debug!("apply inbound");
|
||||
match (&self.state , msg) {
|
||||
(State::StAcquiring, Message::MsgAcquired(s)) => self.on_acquired(s),
|
||||
(State::StBusy(StBusyKind::NextTx), Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx))) => self.on_reply_next_tx(tx),
|
||||
(State::StBusy(StBusyKind::HasTx), Message::MsgResponse(MsgResponse::MsgReplyHasTx(arg))) => self.on_reply_has_tx(arg),
|
||||
(State::StBusy(StBusyKind::GetSizes), Message::MsgResponse(MsgResponse::MsgReplyGetSizes(msc))) => self.on_reply_get_size(msc),
|
||||
match (&self.state, msg) {
|
||||
(State::StAcquiring, Message::MsgAcquired(s)) => self.on_acquired(s),
|
||||
(
|
||||
State::StBusy(StBusyKind::NextTx),
|
||||
Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx)),
|
||||
) => self.on_reply_next_tx(tx),
|
||||
(
|
||||
State::StBusy(StBusyKind::HasTx),
|
||||
Message::MsgResponse(MsgResponse::MsgReplyHasTx(arg)),
|
||||
) => self.on_reply_has_tx(arg),
|
||||
(
|
||||
State::StBusy(StBusyKind::GetSizes),
|
||||
Message::MsgResponse(MsgResponse::MsgReplyGetSizes(msc)),
|
||||
) => self.on_reply_get_size(msc),
|
||||
(state, msg) => Err(MachineError::invalid_msg::<Self>(&state, &msg)),
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue