feat(miniprotocols): Add Tx-Mempool-Monitoring mini-Protocol (#150)

* implement txmonitor mini-protocol
* update README
* add Protocol Version V11,V12 and function V10_and_above for handshake protocol
This commit is contained in:
Torben 2022-07-12 17:41:53 +02:00 committed by GitHub
parent 0b23d7b638
commit 3b685e8b7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 411 additions and 0 deletions

View file

@ -19,6 +19,7 @@ The following architectural decisions were made for this particular Rust impleme
| handshake | done | planned |
| local-state | done | planned |
| tx-submission | planned | minimal |
| local tx monitor | done | planned |
| local-tx-submission | ongoing | planned |
## Implementation Details

View file

@ -16,6 +16,8 @@ const PROTOCOL_V7: u64 = 32775;
const PROTOCOL_V8: u64 = 32776;
const PROTOCOL_V9: u64 = 32777;
const PROTOCOL_V10: u64 = 32778;
const PROTOCOL_V11: u64 = 32779;
const PROTOCOL_V12: u64 = 32780;
impl VersionTable {
pub fn v1_and_above(network_magic: u64) -> VersionTable {
@ -30,6 +32,8 @@ impl VersionTable {
(PROTOCOL_V8, VersionData(network_magic)),
(PROTOCOL_V9, VersionData(network_magic)),
(PROTOCOL_V10, VersionData(network_magic)),
(PROTOCOL_V11, VersionData(network_magic)),
(PROTOCOL_V12, VersionData(network_magic)),
]
.into_iter()
.collect::<HashMap<u64, VersionData>>();
@ -44,6 +48,19 @@ impl VersionTable {
VersionTable { values }
}
pub fn v10_and_above(network_magic: u64) -> VersionTable {
let values = vec![
(PROTOCOL_V10, VersionData(network_magic)),
(PROTOCOL_V11, VersionData(network_magic)),
(PROTOCOL_V12, VersionData(network_magic)),
]
.into_iter()
.collect::<HashMap<u64, VersionData>>();
VersionTable { values }
}
}
#[derive(Debug, Clone)]

View file

@ -6,6 +6,7 @@ pub mod chainsync;
pub mod handshake;
pub mod localstate;
pub mod txsubmission;
pub mod txmonitor;
pub use common::*;
pub use machines::*;

View file

@ -0,0 +1,172 @@
use super::{Message, MsgRequest, MsgResponse, MempoolSizeAndCapacity};
use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder};
impl Encode<()> for Message
{
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
Message::MsgDone => {
e.array(1)?.u16(0)?;
},
Message::MsgAcquire => {
e.array(1)?.u16(1)?;
},
Message::MsgAcquired(slot) => {
e.array(2)?.u16(2)?;
e.encode(slot)?;
},
Message::MsgQuery(query) => {
query.encode(e, ctx)?;
},
Message::MsgResponse(response) => {
response.encode(e, ctx)?;
}
}
log::debug!("encode message: {:?}",self);
Ok(())
}
}
impl<'b> Decode<'b,()> for Message {
fn decode(d: &mut pallas_codec::minicbor::Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let label = d.u16()?;
log::debug!("decode message: {:?}",label);
match label {
0 => {
Ok(Message::MsgDone)
},
1 => {
Ok(Message::MsgAcquire)
},
2 => {
let slot = d.decode()?;
Ok(Message::MsgAcquired(slot))
},
3 => {
Ok(Message::MsgQuery(MsgRequest::MsgRelease))
},
5 => {
Ok(Message::MsgQuery(MsgRequest::MsgNextTx))
},
6 => {
log::trace!("Decoding 6, 1. Array: {:?}",d);
let de : Result<Option<u64>,pallas_codec::minicbor::decode::Error> = d.array();
log::trace!("Decoding 6, 2. Array: {:?}",de);
let tag : Result<u8,pallas_codec::minicbor::decode::Error> = d.u8();
let mut tx = None;
if let Ok(_) = tag {
log::trace!("Decoding 6, Tag: {:?}",tag);
let det = d.tag();
log::trace!("Decoding 6, Bytes: {:?}",det);
let cbor = d.bytes()?;
tx = Some(hex::encode(cbor));
log::trace!("Decoding 6, Tx: {:?}",tx);
}
Ok(Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx)))
},
7 => {
let txid = d.decode()?;
Ok(Message::MsgQuery(MsgRequest::MsgHasTx(txid)))
}
8 => {
let has = d.decode()?;
Ok(Message::MsgResponse(MsgResponse::MsgReplyHasTx(has)))
}
9 => {
Ok(Message::MsgQuery(MsgRequest::MsgGetSizes))
}
10 => {
d.array()?;
let capacity = d.decode()?;
let size_in_bytes = d.decode()?;
let number_of_tx = d.decode()?;
Ok(
Message::MsgResponse(MsgResponse::MsgReplyGetSizes(MempoolSizeAndCapacity {
capacity_in_bytes : capacity,
size_in_bytes : size_in_bytes,
number_of_txs : number_of_tx,
}))
)
}
_ => Err(decode::Error::message(
"can't decode Message",
))
}
}
fn nil() -> Option<Self> {
None
}
}
impl Encode<()> for MsgRequest {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
MsgRequest::MsgAwaitAcquire => {
e.array(1)?.u16(1)?;
},
MsgRequest::MsgGetSizes => {
e.array(1)?.u16(9)?;
},
MsgRequest::MsgHasTx(tx) => {
e.array(2)?.u16(7)?;
e.encode(tx)?;
},
MsgRequest::MsgNextTx => {
e.array(1)?.u16(5)?;
},
MsgRequest::MsgRelease => {
e.array(1)?.u16(3)?;
},
}
log::debug!("encode message: {:?}",self);
Ok(())
}
}
impl Encode<()> for MsgResponse {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
MsgResponse::MsgReplyGetSizes(sz) => {
e.array(2)?.u16(10)?;
e.array(3)?;
e.encode(sz.capacity_in_bytes)?;
e.encode(sz.size_in_bytes)?;
e.encode(sz.number_of_txs)?;
},
MsgResponse::MsgReplyHasTx(tx) => {
e.array(2)?.u16(8)?;
e.encode(tx)?;
},
MsgResponse::MsgReplyNextTx(None) => {
e.array(1)?.u16(6)?;
},
MsgResponse::MsgReplyNextTx(Some(tx)) => {
e.array(2)?.u16(6)?;
e.encode(tx.to_string())?;
},
}
Ok(())
}
fn is_nil(&self) -> bool {
false
}
}

View file

@ -0,0 +1,220 @@
mod codec;
use std::{fmt::{Debug}};
use pallas_codec::Fragment;
use crate::machines::{Agent, MachineError, Transition};
type Slot = u64;
type TxId = String;
type Tx = String;
#[derive(Debug, PartialEq, Clone)]
pub enum StBusyKind {
NextTx,
HasTx,
GetSizes,
}
#[derive(Debug, PartialEq, Clone)]
pub enum State {
StIdle,
StAcquiring,
StAcquired,
StBusy(StBusyKind),
StDone,
}
#[derive(Debug, PartialEq, Clone)]
pub struct MempoolSizeAndCapacity {
pub capacity_in_bytes : u32,
pub size_in_bytes : u32,
pub number_of_txs : u32,
}
#[derive(Debug, Clone)]
pub enum Message {
MsgAcquire,
MsgAcquired(Slot),
MsgQuery(MsgRequest),
MsgResponse(MsgResponse),
MsgDone,
}
#[derive(Debug, Clone)]
pub enum MsgRequest {
MsgAwaitAcquire,
MsgNextTx,
MsgHasTx(TxId),
MsgGetSizes,
MsgRelease,
}
#[derive(Debug, Clone)]
pub enum MsgResponse {
MsgReplyNextTx(Option<Tx>),
MsgReplyHasTx(bool),
MsgReplyGetSizes(MempoolSizeAndCapacity),
}
#[derive(Debug, Clone)]
pub struct LocalTxMonitor {
pub state : State,
pub snapshot : Option<Slot>,
pub request : Option<MsgRequest>,
pub output : Option<MsgResponse>,
}
impl LocalTxMonitor
where
Message : Fragment,
{
pub fn initial(state : State) -> Self {
Self {
state : state,
snapshot : None,
request : None,
output : None,
}
}
fn on_acquired(self, s: Slot) -> Transition<Self> {
log::debug!("acquired Slot: '{:?}' ",s);
Ok(Self {
state : State::StAcquired,
snapshot : Some(s),
output : None,
..self
})
}
fn on_reply_next_tx(self, tx: Option<Tx>) -> Transition<Self> {
log::debug!("Next Transaction: {:?}", tx);
Ok(Self {
output: Some(MsgResponse::MsgReplyNextTx(tx)),
..self
})
}
fn on_reply_has_tx(self, arg: bool) -> Transition<Self> {
log::debug!("Mempool has transaction: {:?}", arg);
Ok(Self {
output: Some(MsgResponse::MsgReplyHasTx(arg)),
..self
})
}
fn on_reply_get_size(self, msc: MempoolSizeAndCapacity) -> Transition<Self> {
log::debug!("Mempool Status: {:?}", msc);
Ok(Self {
output: Some(MsgResponse::MsgReplyGetSizes(msc)),
..self
})
}
}
impl Agent for LocalTxMonitor
where
Message: Fragment, {
type Message = Message;
type State = State;
fn state(&self) -> &Self::State {
log::debug!("State: {:?}", &self.state);
&self.state
}
fn is_done(&self) -> bool {
let done = self.state == State::StDone;
log::debug!("is_done: {:?}",done);
done
}
fn has_agency(&self) -> bool{
log::trace!("Hase Agency: State: {:?}, Request: {:?}, Response: {:?}",self.state,self.request,self.output);
match &self.state {
State::StIdle => true,
State::StAcquiring => false,
State::StAcquired => true,
State::StBusy(..) => false,
State::StDone => false,
}
}
fn build_next(&self) -> Self::Message {
log::debug!("build next; State: {:?}, request: {:?}, output: {:?}",&self.state, &self.request, &self.output);
match (&self.state, &self.request, &self.output) {
(State::StIdle, None ,None) => Message::MsgAcquire,
(State::StAcquired, None , None) => Message::MsgAcquire,
(State::StAcquired, Some(MsgRequest::MsgAwaitAcquire), None) => Message::MsgAcquire,
(State::StAcquired, Some(MsgRequest::MsgNextTx),None) => Message::MsgQuery(MsgRequest::MsgNextTx),
(State::StAcquired, Some(MsgRequest::MsgHasTx(tx)),None) => Message::MsgQuery(MsgRequest::MsgHasTx(tx.clone())),
(State::StAcquired, Some(MsgRequest::MsgGetSizes),None) => Message::MsgQuery(MsgRequest::MsgGetSizes),
(State::StAcquired, None, Some(_)) => Message::MsgAcquire,
(State::StAcquired, Some(req), Some(_)) => Message::MsgQuery(req.to_owned()),
_ => panic!("I do not have agency, don't know what to do")
}
}
fn apply_start(self) -> Transition<Self> {
log::debug!("apply start");
Ok(self)
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
log::debug!("apply outbound");
match (self.state, msg) {
(State::StIdle, Message::MsgAcquire) => {
log::debug!("apply outbound : MsgAcquire");
Ok(Self {
state: State::StAcquiring,
..self
})},
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgNextTx)) => Ok(Self {
state: State::StBusy(StBusyKind::NextTx),
..self
}),
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgHasTx(_))) => Ok(Self {
state: State::StBusy(StBusyKind::HasTx),
..self
}),
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgGetSizes)) => Ok(Self {
state: State::StBusy(StBusyKind::GetSizes),
..self
}),
(State::StAcquired, Message::MsgAcquire) => Ok(Self {
state: State::StAcquiring,
..self
}),
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgRelease)) => Ok(Self {
state: State::StIdle,
..self
}),
(State::StIdle, Message::MsgDone) => Ok(Self {
state: State::StDone,
..self
}),
_ => panic!("PANIC! Cannot match outbound")
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
log::debug!("apply inbound");
match (&self.state , msg) {
(State::StAcquiring, Message::MsgAcquired(s)) => self.on_acquired(s),
(State::StBusy(StBusyKind::NextTx), Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx))) => self.on_reply_next_tx(tx),
(State::StBusy(StBusyKind::HasTx), Message::MsgResponse(MsgResponse::MsgReplyHasTx(arg))) => self.on_reply_has_tx(arg),
(State::StBusy(StBusyKind::GetSizes), Message::MsgResponse(MsgResponse::MsgReplyGetSizes(msc))) => self.on_reply_get_size(msc),
(state, msg) => Err(MachineError::invalid_msg::<Self>(&state, &msg)),
}
}
}