diff --git a/pallas-codec/src/utils.rs b/pallas-codec/src/utils.rs index b0b6077..1d02a0e 100644 --- a/pallas-codec/src/utils.rs +++ b/pallas-codec/src/utils.rs @@ -559,7 +559,7 @@ impl From for u64 { AnyUInt::U8(x) => x as u64, AnyUInt::U16(x) => x as u64, AnyUInt::U32(x) => x as u64, - AnyUInt::U64(x) => x as u64, + AnyUInt::U64(x) => x, } } } diff --git a/pallas-miniprotocols/Cargo.toml b/pallas-miniprotocols/Cargo.toml index 569cf83..8e7c0ac 100644 --- a/pallas-miniprotocols/Cargo.toml +++ b/pallas-miniprotocols/Cargo.toml @@ -13,11 +13,7 @@ authors = ["Santiago Carmuega "] [dependencies] pallas-codec = { version = "0.15.0", path = "../pallas-codec/" } pallas-multiplexer = { version = "0.15.0", path = "../pallas-multiplexer/" } -log = "0.4.14" hex = "0.4.3" itertools = "0.10.3" thiserror = "1.0.31" - -[dev-dependencies] -env_logger = "0.9.0" -log = "0.4.16" +tracing = "0.1.37" diff --git a/pallas-miniprotocols/src/chainsync/client.rs b/pallas-miniprotocols/src/chainsync/client.rs index 9d804b2..8a93e57 100644 --- a/pallas-miniprotocols/src/chainsync/client.rs +++ b/pallas-miniprotocols/src/chainsync/client.rs @@ -2,6 +2,7 @@ use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError}; use std::marker::PhantomData; use thiserror::Error; +use tracing::{debug, instrument}; use crate::common::Point; @@ -110,6 +111,7 @@ where pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; + self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?; Ok(()) @@ -117,7 +119,9 @@ where pub fn recv_message(&mut self) -> Result, Error> { self.assert_agency_is_theirs()?; + let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?; + self.assert_inbound_state(&msg)?; Ok(msg) @@ -145,6 +149,7 @@ where } } + #[instrument(skip_all)] pub fn find_intersect(&mut self, points: Vec) -> Result { self.send_find_intersect(points)?; self.recv_intersect_response() @@ -190,19 +195,29 @@ where } } + #[instrument(skip_all)] pub fn request_next(&mut self) -> Result, Error> { + debug!("requesting next block"); + self.send_request_next()?; + self.recv_while_can_await() } pub fn intersect_origin(&mut self) -> Result { + debug!("intersecting origin"); + let (point, _) = self.find_intersect(vec![Point::Origin])?; point.ok_or(Error::IntersectionNotFound) } + #[instrument(skip_all)] pub fn intersect_tip(&mut self) -> Result { let (_, Tip(point, _)) = self.find_intersect(vec![Point::Origin])?; + + debug!(?point, "found tip value"); + let (point, _) = self.find_intersect(vec![point])?; point.ok_or(Error::IntersectionNotFound) diff --git a/pallas-miniprotocols/src/machines.rs b/pallas-miniprotocols/src/machines.rs index 91d0cce..6d7f261 100644 --- a/pallas-miniprotocols/src/machines.rs +++ b/pallas-miniprotocols/src/machines.rs @@ -2,6 +2,7 @@ use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError}; use std::{cell::Cell, fmt::Debug}; use thiserror::Error; +use tracing::trace; #[derive(Debug, Error)] pub enum MachineError { @@ -101,7 +102,7 @@ where match agent.has_agency() { true => { let msg = agent.build_next(); - log::trace!("processing outbound msg: {:?}", msg); + trace!(?msg, "processing outbound msg"); channel .send_msg_chunks(&msg) @@ -112,7 +113,7 @@ where false => { let msg = channel.recv_full_msg().map_err(MachineError::channel)?; - log::trace!("procesing inbound msg: {:?}", msg); + trace!(?msg, "processing inbound msg"); agent.apply_inbound(msg) } diff --git a/pallas-miniprotocols/src/txmonitor/codec.rs b/pallas-miniprotocols/src/txmonitor/codec.rs index 10f9cdf..424b857 100644 --- a/pallas-miniprotocols/src/txmonitor/codec.rs +++ b/pallas-miniprotocols/src/txmonitor/codec.rs @@ -25,7 +25,7 @@ impl Encode<()> for Message { response.encode(e, ctx)?; } } - log::debug!("encode message: {:?}", self); + Ok(()) } } @@ -37,7 +37,7 @@ impl<'b> Decode<'b, ()> for Message { ) -> Result { d.array()?; let label = d.u16()?; - log::debug!("decode message: {:?}", label); + match label { 0 => Ok(Message::MsgDone), 1 => Ok(Message::MsgAcquire), @@ -48,18 +48,14 @@ impl<'b> Decode<'b, ()> for Message { 3 => Ok(Message::MsgQuery(MsgRequest::MsgRelease)), 5 => Ok(Message::MsgQuery(MsgRequest::MsgNextTx)), 6 => { - log::trace!("Decoding 6, 1. Array: {:?}", d); - let de: Result, pallas_codec::minicbor::decode::Error> = d.array(); - log::trace!("Decoding 6, 2. Array: {:?}", de); + d.array()?; let tag: Result = d.u8(); let mut tx = None; + if tag.is_ok() { - log::trace!("Decoding 6, Tag: {:?}", tag); - let det = d.tag(); - log::trace!("Decoding 6, Bytes: {:?}", det); + d.tag()?; let cbor = d.bytes()?; tx = Some(hex::encode(cbor)); - log::trace!("Decoding 6, Tx: {:?}", tx); } Ok(Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx))) } @@ -119,7 +115,7 @@ impl Encode<()> for MsgRequest { e.array(1)?.u16(3)?; } } - log::debug!("encode message: {:?}", self); + Ok(()) } } diff --git a/pallas-miniprotocols/src/txmonitor/mod.rs b/pallas-miniprotocols/src/txmonitor/mod.rs index 4510f90..a08645a 100644 --- a/pallas-miniprotocols/src/txmonitor/mod.rs +++ b/pallas-miniprotocols/src/txmonitor/mod.rs @@ -76,19 +76,16 @@ where } } - fn on_acquired(self, s: Slot) -> Transition { - log::debug!("acquired Slot: '{:?}' ", s); - + fn on_acquired(self, slot: Slot) -> Transition { Ok(Self { state: State::StAcquired, - snapshot: Some(s), + snapshot: Some(slot), output: None, ..self }) } fn on_reply_next_tx(self, tx: Option) -> Transition { - log::debug!("Next Transaction: {:?}", tx); Ok(Self { output: Some(MsgResponse::MsgReplyNextTx(tx)), ..self @@ -96,18 +93,15 @@ where } fn on_reply_has_tx(self, arg: bool) -> Transition { - log::debug!("Mempool has transaction: {:?}", arg); Ok(Self { output: Some(MsgResponse::MsgReplyHasTx(arg)), ..self }) } - fn on_reply_get_size(self, msc: MempoolSizeAndCapacity) -> Transition { - log::debug!("Mempool Status: {:?}", msc); - + fn on_reply_get_size(self, status: MempoolSizeAndCapacity) -> Transition { Ok(Self { - output: Some(MsgResponse::MsgReplyGetSizes(msc)), + output: Some(MsgResponse::MsgReplyGetSizes(status)), ..self }) } @@ -121,23 +115,14 @@ where type State = State; fn state(&self) -> &Self::State { - log::debug!("State: {:?}", &self.state); &self.state } fn is_done(&self) -> bool { - let done = self.state == State::StDone; - log::debug!("is_done: {:?}", done); - done + self.state == State::StDone } fn has_agency(&self) -> bool { - log::trace!( - "Hase Agency: State: {:?}, Request: {:?}, Response: {:?}", - self.state, - self.request, - self.output - ); match &self.state { State::StIdle => true, State::StAcquiring => false, @@ -148,12 +133,6 @@ where } fn build_next(&self) -> Self::Message { - log::debug!( - "build next; State: {:?}, request: {:?}, output: {:?}", - &self.state, - &self.request, - &self.output - ); match (&self.state, &self.request, &self.output) { (State::StIdle, None, None) => Message::MsgAcquire, (State::StAcquired, None, None) => Message::MsgAcquire, @@ -174,20 +153,15 @@ where } fn apply_start(self) -> Transition { - log::debug!("apply start"); Ok(self) } fn apply_outbound(self, msg: Self::Message) -> Transition { - log::debug!("apply outbound"); match (self.state, msg) { - (State::StIdle, Message::MsgAcquire) => { - log::debug!("apply outbound : MsgAcquire"); - Ok(Self { - state: State::StAcquiring, - ..self - }) - } + (State::StIdle, Message::MsgAcquire) => Ok(Self { + state: State::StAcquiring, + ..self + }), (State::StAcquired, Message::MsgQuery(MsgRequest::MsgNextTx)) => Ok(Self { state: State::StBusy(StBusyKind::NextTx), ..self @@ -219,7 +193,6 @@ where } fn apply_inbound(self, msg: Self::Message) -> Transition { - log::debug!("apply inbound"); match (&self.state, msg) { (State::StAcquiring, Message::MsgAcquired(s)) => self.on_acquired(s), ( diff --git a/pallas-miniprotocols/src/txsubmission/mod.rs b/pallas-miniprotocols/src/txsubmission/mod.rs index c14fdbc..a3dd6c9 100644 --- a/pallas-miniprotocols/src/txsubmission/mod.rs +++ b/pallas-miniprotocols/src/txsubmission/mod.rs @@ -1,8 +1,8 @@ use std::fmt::Debug; use itertools::Itertools; -use log::debug; use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; +use tracing::debug; use crate::machines::{Agent, MachineError, Transition}; diff --git a/pallas-multiplexer/Cargo.toml b/pallas-multiplexer/Cargo.toml index bbaec52..03b3eed 100644 --- a/pallas-multiplexer/Cargo.toml +++ b/pallas-multiplexer/Cargo.toml @@ -17,9 +17,7 @@ byteorder = "1.4.3" hex = "0.4.3" rand = "0.8.4" thiserror = "1.0.31" - -[dev-dependencies] -env_logger = "0.9.0" +tracing = "0.1.37" [features] std = [] diff --git a/pallas-multiplexer/src/bearers.rs b/pallas-multiplexer/src/bearers.rs index 1d69e5a..7705adb 100644 --- a/pallas-multiplexer/src/bearers.rs +++ b/pallas-multiplexer/src/bearers.rs @@ -1,8 +1,8 @@ use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt}; -use log::{debug, log_enabled, trace}; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpListener, ToSocketAddrs}; use std::{net::TcpStream, time::Instant}; +use tracing::{debug, event_enabled, trace}; use crate::Payload; @@ -37,12 +37,12 @@ fn write_segment(writer: &mut impl Write, segment: Segment) -> Result<(), std::i msg.write_u16::(protocol)?; msg.write_u16::(payload.len() as u16)?; - if log_enabled!(log::Level::Trace) { + if event_enabled!(tracing::Level::TRACE) { trace!( - "sending segment, header {:?}, protocol id: {}, payload length: {}", - hex::encode(&msg), protocol, - payload.len() + length = payload.len(), + message = hex::encode(&msg), + "writing segment" ); } @@ -57,24 +57,21 @@ fn read_segment(reader: &mut impl Read) -> Result { reader.read_exact(&mut header)?; - if log_enabled!(log::Level::Trace) { - trace!("read segment header: {:?}", hex::encode(header)); + if event_enabled!(tracing::Level::TRACE) { + trace!(header = hex::encode(header), "segment header read"); } let length = NetworkEndian::read_u16(&header[6..]) as usize; let protocol = NetworkEndian::read_u16(&header[4..6]) as usize ^ 0x8000; let timestamp = NetworkEndian::read_u32(&header[0..4]); - debug!( - "parsed inbound msg, protocol id: {}, ts: {}, payload length: {}", - protocol, timestamp, length - ); + debug!(protocol, timestamp, length, "parsed inbound msg"); let mut payload = vec![0u8; length]; reader.read_exact(&mut payload)?; - if log_enabled!(log::Level::Trace) { - trace!("read segment payload: {:?}", hex::encode(&payload)); + if event_enabled!(tracing::Level::TRACE) { + trace!(payload = hex::encode(&payload), "segment payload read"); } Ok(Segment {