feat(miniprotocols): Introduce tracing (#214)
This commit is contained in:
parent
6fa936a998
commit
c51580d042
9 changed files with 47 additions and 71 deletions
|
|
@ -559,7 +559,7 @@ impl From<AnyUInt> for u64 {
|
|||
AnyUInt::U8(x) => x as u64,
|
||||
AnyUInt::U16(x) => x as u64,
|
||||
AnyUInt::U32(x) => x as u64,
|
||||
AnyUInt::U64(x) => x as u64,
|
||||
AnyUInt::U64(x) => x,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,11 +13,7 @@ authors = ["Santiago Carmuega <santiago@carmuega.me>"]
|
|||
[dependencies]
|
||||
pallas-codec = { version = "0.15.0", path = "../pallas-codec/" }
|
||||
pallas-multiplexer = { version = "0.15.0", path = "../pallas-multiplexer/" }
|
||||
log = "0.4.14"
|
||||
hex = "0.4.3"
|
||||
itertools = "0.10.3"
|
||||
thiserror = "1.0.31"
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.9.0"
|
||||
log = "0.4.16"
|
||||
tracing = "0.1.37"
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use pallas_codec::Fragment;
|
|||
use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError};
|
||||
use std::marker::PhantomData;
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
use crate::common::Point;
|
||||
|
||||
|
|
@ -110,6 +111,7 @@ where
|
|||
pub fn send_message(&mut self, msg: &Message<O>) -> Result<(), Error> {
|
||||
self.assert_agency_is_ours()?;
|
||||
self.assert_outbound_state(msg)?;
|
||||
|
||||
self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?;
|
||||
|
||||
Ok(())
|
||||
|
|
@ -117,7 +119,9 @@ where
|
|||
|
||||
pub fn recv_message(&mut self) -> Result<Message<O>, Error> {
|
||||
self.assert_agency_is_theirs()?;
|
||||
|
||||
let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?;
|
||||
|
||||
self.assert_inbound_state(&msg)?;
|
||||
|
||||
Ok(msg)
|
||||
|
|
@ -145,6 +149,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn find_intersect(&mut self, points: Vec<Point>) -> Result<IntersectResponse, Error> {
|
||||
self.send_find_intersect(points)?;
|
||||
self.recv_intersect_response()
|
||||
|
|
@ -190,19 +195,29 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn request_next(&mut self) -> Result<NextResponse<O>, Error> {
|
||||
debug!("requesting next block");
|
||||
|
||||
self.send_request_next()?;
|
||||
|
||||
self.recv_while_can_await()
|
||||
}
|
||||
|
||||
pub fn intersect_origin(&mut self) -> Result<Point, Error> {
|
||||
debug!("intersecting origin");
|
||||
|
||||
let (point, _) = self.find_intersect(vec![Point::Origin])?;
|
||||
|
||||
point.ok_or(Error::IntersectionNotFound)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn intersect_tip(&mut self) -> Result<Point, Error> {
|
||||
let (_, Tip(point, _)) = self.find_intersect(vec![Point::Origin])?;
|
||||
|
||||
debug!(?point, "found tip value");
|
||||
|
||||
let (point, _) = self.find_intersect(vec![point])?;
|
||||
|
||||
point.ok_or(Error::IntersectionNotFound)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use pallas_codec::Fragment;
|
|||
use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError};
|
||||
use std::{cell::Cell, fmt::Debug};
|
||||
use thiserror::Error;
|
||||
use tracing::trace;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum MachineError {
|
||||
|
|
@ -101,7 +102,7 @@ where
|
|||
match agent.has_agency() {
|
||||
true => {
|
||||
let msg = agent.build_next();
|
||||
log::trace!("processing outbound msg: {:?}", msg);
|
||||
trace!(?msg, "processing outbound msg");
|
||||
|
||||
channel
|
||||
.send_msg_chunks(&msg)
|
||||
|
|
@ -112,7 +113,7 @@ where
|
|||
false => {
|
||||
let msg = channel.recv_full_msg().map_err(MachineError::channel)?;
|
||||
|
||||
log::trace!("procesing inbound msg: {:?}", msg);
|
||||
trace!(?msg, "processing inbound msg");
|
||||
|
||||
agent.apply_inbound(msg)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ impl Encode<()> for Message {
|
|||
response.encode(e, ctx)?;
|
||||
}
|
||||
}
|
||||
log::debug!("encode message: {:?}", self);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
@ -37,7 +37,7 @@ impl<'b> Decode<'b, ()> for Message {
|
|||
) -> Result<Self, decode::Error> {
|
||||
d.array()?;
|
||||
let label = d.u16()?;
|
||||
log::debug!("decode message: {:?}", label);
|
||||
|
||||
match label {
|
||||
0 => Ok(Message::MsgDone),
|
||||
1 => Ok(Message::MsgAcquire),
|
||||
|
|
@ -48,18 +48,14 @@ impl<'b> Decode<'b, ()> for Message {
|
|||
3 => Ok(Message::MsgQuery(MsgRequest::MsgRelease)),
|
||||
5 => Ok(Message::MsgQuery(MsgRequest::MsgNextTx)),
|
||||
6 => {
|
||||
log::trace!("Decoding 6, 1. Array: {:?}", d);
|
||||
let de: Result<Option<u64>, pallas_codec::minicbor::decode::Error> = d.array();
|
||||
log::trace!("Decoding 6, 2. Array: {:?}", de);
|
||||
d.array()?;
|
||||
let tag: Result<u8, pallas_codec::minicbor::decode::Error> = d.u8();
|
||||
let mut tx = None;
|
||||
|
||||
if tag.is_ok() {
|
||||
log::trace!("Decoding 6, Tag: {:?}", tag);
|
||||
let det = d.tag();
|
||||
log::trace!("Decoding 6, Bytes: {:?}", det);
|
||||
d.tag()?;
|
||||
let cbor = d.bytes()?;
|
||||
tx = Some(hex::encode(cbor));
|
||||
log::trace!("Decoding 6, Tx: {:?}", tx);
|
||||
}
|
||||
Ok(Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx)))
|
||||
}
|
||||
|
|
@ -119,7 +115,7 @@ impl Encode<()> for MsgRequest {
|
|||
e.array(1)?.u16(3)?;
|
||||
}
|
||||
}
|
||||
log::debug!("encode message: {:?}", self);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,19 +76,16 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn on_acquired(self, s: Slot) -> Transition<Self> {
|
||||
log::debug!("acquired Slot: '{:?}' ", s);
|
||||
|
||||
fn on_acquired(self, slot: Slot) -> Transition<Self> {
|
||||
Ok(Self {
|
||||
state: State::StAcquired,
|
||||
snapshot: Some(s),
|
||||
snapshot: Some(slot),
|
||||
output: None,
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn on_reply_next_tx(self, tx: Option<Tx>) -> Transition<Self> {
|
||||
log::debug!("Next Transaction: {:?}", tx);
|
||||
Ok(Self {
|
||||
output: Some(MsgResponse::MsgReplyNextTx(tx)),
|
||||
..self
|
||||
|
|
@ -96,18 +93,15 @@ where
|
|||
}
|
||||
|
||||
fn on_reply_has_tx(self, arg: bool) -> Transition<Self> {
|
||||
log::debug!("Mempool has transaction: {:?}", arg);
|
||||
Ok(Self {
|
||||
output: Some(MsgResponse::MsgReplyHasTx(arg)),
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn on_reply_get_size(self, msc: MempoolSizeAndCapacity) -> Transition<Self> {
|
||||
log::debug!("Mempool Status: {:?}", msc);
|
||||
|
||||
fn on_reply_get_size(self, status: MempoolSizeAndCapacity) -> Transition<Self> {
|
||||
Ok(Self {
|
||||
output: Some(MsgResponse::MsgReplyGetSizes(msc)),
|
||||
output: Some(MsgResponse::MsgReplyGetSizes(status)),
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
|
@ -121,23 +115,14 @@ where
|
|||
type State = State;
|
||||
|
||||
fn state(&self) -> &Self::State {
|
||||
log::debug!("State: {:?}", &self.state);
|
||||
&self.state
|
||||
}
|
||||
|
||||
fn is_done(&self) -> bool {
|
||||
let done = self.state == State::StDone;
|
||||
log::debug!("is_done: {:?}", done);
|
||||
done
|
||||
self.state == State::StDone
|
||||
}
|
||||
|
||||
fn has_agency(&self) -> bool {
|
||||
log::trace!(
|
||||
"Hase Agency: State: {:?}, Request: {:?}, Response: {:?}",
|
||||
self.state,
|
||||
self.request,
|
||||
self.output
|
||||
);
|
||||
match &self.state {
|
||||
State::StIdle => true,
|
||||
State::StAcquiring => false,
|
||||
|
|
@ -148,12 +133,6 @@ where
|
|||
}
|
||||
|
||||
fn build_next(&self) -> Self::Message {
|
||||
log::debug!(
|
||||
"build next; State: {:?}, request: {:?}, output: {:?}",
|
||||
&self.state,
|
||||
&self.request,
|
||||
&self.output
|
||||
);
|
||||
match (&self.state, &self.request, &self.output) {
|
||||
(State::StIdle, None, None) => Message::MsgAcquire,
|
||||
(State::StAcquired, None, None) => Message::MsgAcquire,
|
||||
|
|
@ -174,20 +153,15 @@ where
|
|||
}
|
||||
|
||||
fn apply_start(self) -> Transition<Self> {
|
||||
log::debug!("apply start");
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
|
||||
log::debug!("apply outbound");
|
||||
match (self.state, msg) {
|
||||
(State::StIdle, Message::MsgAcquire) => {
|
||||
log::debug!("apply outbound : MsgAcquire");
|
||||
Ok(Self {
|
||||
state: State::StAcquiring,
|
||||
..self
|
||||
})
|
||||
}
|
||||
(State::StIdle, Message::MsgAcquire) => Ok(Self {
|
||||
state: State::StAcquiring,
|
||||
..self
|
||||
}),
|
||||
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgNextTx)) => Ok(Self {
|
||||
state: State::StBusy(StBusyKind::NextTx),
|
||||
..self
|
||||
|
|
@ -219,7 +193,6 @@ where
|
|||
}
|
||||
|
||||
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
|
||||
log::debug!("apply inbound");
|
||||
match (&self.state, msg) {
|
||||
(State::StAcquiring, Message::MsgAcquired(s)) => self.on_acquired(s),
|
||||
(
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
use std::fmt::Debug;
|
||||
|
||||
use itertools::Itertools;
|
||||
use log::debug;
|
||||
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::machines::{Agent, MachineError, Transition};
|
||||
|
||||
|
|
|
|||
|
|
@ -17,9 +17,7 @@ byteorder = "1.4.3"
|
|||
hex = "0.4.3"
|
||||
rand = "0.8.4"
|
||||
thiserror = "1.0.31"
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.9.0"
|
||||
tracing = "0.1.37"
|
||||
|
||||
[features]
|
||||
std = []
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt};
|
||||
use log::{debug, log_enabled, trace};
|
||||
use std::io::{Read, Write};
|
||||
use std::net::{SocketAddr, TcpListener, ToSocketAddrs};
|
||||
use std::{net::TcpStream, time::Instant};
|
||||
use tracing::{debug, event_enabled, trace};
|
||||
|
||||
use crate::Payload;
|
||||
|
||||
|
|
@ -37,12 +37,12 @@ fn write_segment(writer: &mut impl Write, segment: Segment) -> Result<(), std::i
|
|||
msg.write_u16::<NetworkEndian>(protocol)?;
|
||||
msg.write_u16::<NetworkEndian>(payload.len() as u16)?;
|
||||
|
||||
if log_enabled!(log::Level::Trace) {
|
||||
if event_enabled!(tracing::Level::TRACE) {
|
||||
trace!(
|
||||
"sending segment, header {:?}, protocol id: {}, payload length: {}",
|
||||
hex::encode(&msg),
|
||||
protocol,
|
||||
payload.len()
|
||||
length = payload.len(),
|
||||
message = hex::encode(&msg),
|
||||
"writing segment"
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -57,24 +57,21 @@ fn read_segment(reader: &mut impl Read) -> Result<Segment, std::io::Error> {
|
|||
|
||||
reader.read_exact(&mut header)?;
|
||||
|
||||
if log_enabled!(log::Level::Trace) {
|
||||
trace!("read segment header: {:?}", hex::encode(header));
|
||||
if event_enabled!(tracing::Level::TRACE) {
|
||||
trace!(header = hex::encode(header), "segment header read");
|
||||
}
|
||||
|
||||
let length = NetworkEndian::read_u16(&header[6..]) as usize;
|
||||
let protocol = NetworkEndian::read_u16(&header[4..6]) as usize ^ 0x8000;
|
||||
let timestamp = NetworkEndian::read_u32(&header[0..4]);
|
||||
|
||||
debug!(
|
||||
"parsed inbound msg, protocol id: {}, ts: {}, payload length: {}",
|
||||
protocol, timestamp, length
|
||||
);
|
||||
debug!(protocol, timestamp, length, "parsed inbound msg");
|
||||
|
||||
let mut payload = vec![0u8; length];
|
||||
reader.read_exact(&mut payload)?;
|
||||
|
||||
if log_enabled!(log::Level::Trace) {
|
||||
trace!("read segment payload: {:?}", hex::encode(&payload));
|
||||
if event_enabled!(tracing::Level::TRACE) {
|
||||
trace!(payload = hex::encode(&payload), "segment payload read");
|
||||
}
|
||||
|
||||
Ok(Segment {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue