diff --git a/pallas-traverse/src/aux.rs b/pallas-traverse/src/aux.rs index 7d35e95..3df5599 100644 --- a/pallas-traverse/src/aux.rs +++ b/pallas-traverse/src/aux.rs @@ -7,13 +7,10 @@ use crate::MultiEraTx; impl<'b> MultiEraTx<'b> { pub fn aux_plutus_v1_scripts(&self) -> &[alonzo::PlutusScript] { if let Some(aux_data) = self.aux_data() { - match aux_data.deref() { - alonzo::AuxiliaryData::PostAlonzo(x) => { - if let Some(plutus) = &x.plutus_scripts { - return plutus.as_ref(); - } + if let alonzo::AuxiliaryData::PostAlonzo(x) = aux_data.deref() { + if let Some(plutus) = &x.plutus_scripts { + return plutus.as_ref(); } - _ => (), } } diff --git a/pallas-traverse/src/block.rs b/pallas-traverse/src/block.rs index 73affaa..f65e0c0 100644 --- a/pallas-traverse/src/block.rs +++ b/pallas-traverse/src/block.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, ops::Deref}; +use std::borrow::Cow; use pallas_codec::minicbor; use pallas_crypto::hash::Hash; diff --git a/pallas-upstream/Cargo.toml b/pallas-upstream/Cargo.toml index 68d2012..9fefe42 100644 --- a/pallas-upstream/Cargo.toml +++ b/pallas-upstream/Cargo.toml @@ -11,7 +11,8 @@ readme = "README.md" authors = ["Santiago Carmuega "] [dependencies] -gasket = { git = "https://github.com/construkts/gasket-rs", version = "0.1.0" } +gasket = { git = "https://github.com/construkts/gasket-rs" } +# gasket = { version = "0.1.0", path = "../../../construkts/gasket-rs" } pallas-codec = { version = "0.18.0", path = "../pallas-codec" } pallas-crypto = { version = "0.18.0", path = "../pallas-crypto" } pallas-miniprotocols = { version = "0.18.0", path = "../pallas-miniprotocols" } diff --git a/pallas-upstream/src/api.rs b/pallas-upstream/src/api.rs index 3f97f94..e68d2a4 100644 --- a/pallas-upstream/src/api.rs +++ b/pallas-upstream/src/api.rs @@ -1,8 +1,15 @@ pub use crate::cursor; +pub use crate::framework::BlockFetchEvent; + +pub use crate::framework::DownstreamPort; + pub mod n2n { use crate::{blockfetch, chainsync, cursor::Cursor, framework::*, plexer}; - use gasket::runtime::Tether; + use gasket::{ + messaging::{SendAdapter, SendPort}, + runtime::Tether, + }; pub struct Runtime { pub plexer_tether: Tether, @@ -10,25 +17,31 @@ pub mod n2n { pub blockfetch_tether: Tether, } - pub struct Bootstrapper { + pub struct Bootstrapper + where + A: SendAdapter, + { cursor: Cursor, peer_address: String, network_magic: u64, - output: blockfetch::DownstreamPort, + output: super::DownstreamPort, } - impl Bootstrapper { + impl Bootstrapper + where + A: SendAdapter + 'static, + { pub fn new(cursor: Cursor, peer_address: String, network_magic: u64) -> Self { Bootstrapper { cursor, peer_address, network_magic, - output: blockfetch::DownstreamPort::default(), + output: Default::default(), } } - pub fn borrow_output_port(&mut self) -> &mut blockfetch::DownstreamPort { - &mut self.output + pub fn connect_output(&mut self, adapter: A) { + self.output.connect(adapter); } pub fn spawn(self) -> Result { @@ -54,15 +67,15 @@ pub mod n2n { let mut demux2_out = DemuxOutputPort::default(); let mut demux2_in = DemuxInputPort::default(); - gasket::messaging::connect_ports(&mut demux2_out, &mut demux2_in, 1000); + gasket::messaging::crossbeam::connect_ports(&mut demux2_out, &mut demux2_in, 1000); let mut demux3_out = DemuxOutputPort::default(); let mut demux3_in = DemuxInputPort::default(); - gasket::messaging::connect_ports(&mut demux3_out, &mut demux3_in, 1000); + gasket::messaging::crossbeam::connect_ports(&mut demux3_out, &mut demux3_in, 1000); let mut mux2_out = MuxOutputPort::default(); let mut mux3_out = MuxOutputPort::default(); - gasket::messaging::funnel_ports( + gasket::messaging::crossbeam::funnel_ports( vec![&mut mux2_out, &mut mux3_out], &mut mux_input, 1000, @@ -70,7 +83,7 @@ pub mod n2n { let mut chainsync_downstream = chainsync::DownstreamPort::default(); let mut blockfetch_upstream = blockfetch::UpstreamPort::default(); - gasket::messaging::connect_ports( + gasket::messaging::crossbeam::connect_ports( &mut chainsync_downstream, &mut blockfetch_upstream, 20, diff --git a/pallas-upstream/src/blockfetch.rs b/pallas-upstream/src/blockfetch.rs index 0e0d02e..1097f64 100644 --- a/pallas-upstream/src/blockfetch.rs +++ b/pallas-upstream/src/blockfetch.rs @@ -1,3 +1,4 @@ +use gasket::messaging::SendAdapter; use tracing::{debug, error, instrument}; use pallas_crypto::hash::Hash; @@ -6,23 +7,27 @@ use pallas_miniprotocols::Point; use crate::framework::*; -pub type UpstreamPort = gasket::messaging::TwoPhaseInputPort; -pub type DownstreamPort = gasket::messaging::OutputPort; - +pub type UpstreamPort = gasket::messaging::crossbeam::TwoPhaseInputPort; pub type OuroborosClient = blockfetch::Client; -pub struct Worker { +pub struct Worker +where + T: Send + Sync, +{ client: OuroborosClient, upstream: UpstreamPort, - downstream: DownstreamPort, + downstream: DownstreamPort, block_count: gasket::metrics::Counter, } -impl Worker { +impl Worker +where + T: Send + Sync, +{ pub fn new( plexer: ProtocolChannel, upstream: UpstreamPort, - downstream: DownstreamPort, + downstream: DownstreamPort, ) -> Self { let client = OuroborosClient::new(plexer); @@ -56,7 +61,10 @@ impl Worker { } } -impl gasket::runtime::Worker for Worker { +impl gasket::runtime::Worker for Worker +where + A: SendAdapter, +{ fn metrics(&self) -> gasket::metrics::Registry { gasket::metrics::Builder::new() .with_counter("fetched_blocks", &self.block_count) diff --git a/pallas-upstream/src/chainsync.rs b/pallas-upstream/src/chainsync.rs index 6ff9bfc..91e23be 100644 --- a/pallas-upstream/src/chainsync.rs +++ b/pallas-upstream/src/chainsync.rs @@ -17,7 +17,7 @@ fn to_traverse(header: &chainsync::HeaderContent) -> Result, out.map_err(Error::parse) } -pub type DownstreamPort = gasket::messaging::OutputPort; +pub type DownstreamPort = gasket::messaging::crossbeam::OutputPort; pub type OuroborosClient = chainsync::N2NClient; diff --git a/pallas-upstream/src/framework.rs b/pallas-upstream/src/framework.rs index 44f4fbf..0c63f06 100644 --- a/pallas-upstream/src/framework.rs +++ b/pallas-upstream/src/framework.rs @@ -21,14 +21,16 @@ pub enum BlockFetchEvent { } // ports used by plexer -pub type MuxOutputPort = gasket::messaging::OutputPort<(u16, multiplexer::Payload)>; -pub type DemuxInputPort = gasket::messaging::InputPort; +pub type MuxOutputPort = gasket::messaging::crossbeam::OutputPort<(u16, multiplexer::Payload)>; +pub type DemuxInputPort = gasket::messaging::crossbeam::InputPort; // ports used by mini-protocols -pub type MuxInputPort = gasket::messaging::InputPort<(u16, multiplexer::Payload)>; -pub type DemuxOutputPort = gasket::messaging::OutputPort; +pub type MuxInputPort = gasket::messaging::crossbeam::InputPort<(u16, multiplexer::Payload)>; +pub type DemuxOutputPort = gasket::messaging::crossbeam::OutputPort; + +// final output port +pub type DownstreamPort = gasket::messaging::OutputPort; -#[derive(Debug)] pub struct ProtocolChannel(pub u16, pub MuxOutputPort, pub DemuxInputPort); impl multiplexer::agents::Channel for ProtocolChannel { @@ -61,14 +63,14 @@ impl multiplexer::agents::Channel for ProtocolChannel { #[derive(Error, Debug)] pub enum Error { - #[error("client error: {0}")] - ClientError(String), + #[error("{0}")] + Client(String), - #[error("parse error: {0}")] - ParseError(String), + #[error("{0}")] + Parse(String), - #[error("server error: {0}")] - ServerError(String), + #[error("{0}")] + Server(String), #[error("{0}")] Message(String), @@ -79,15 +81,15 @@ pub enum Error { impl Error { pub fn client(error: impl ToString) -> Error { - Error::ClientError(error.to_string()) + Error::Client(error.to_string()) } pub fn parse(error: impl ToString) -> Error { - Error::ParseError(error.to_string()) + Error::Parse(error.to_string()) } pub fn server(error: impl ToString) -> Error { - Error::ServerError(error.to_string()) + Error::Server(error.to_string()) } pub fn message(error: impl ToString) -> Error {