From 1dc87174bd8c11704797bb08dab66db696ac92dc Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 11 Apr 2023 14:33:45 +0200 Subject: [PATCH] fix: Make upstream worker easy to connect (#246) --- pallas-upstream/Cargo.toml | 6 +++--- pallas-upstream/src/worker.rs | 29 +++++++++++++++------------- pallas-upstream/tests/integration.rs | 15 +++++--------- 3 files changed, 24 insertions(+), 26 deletions(-) diff --git a/pallas-upstream/Cargo.toml b/pallas-upstream/Cargo.toml index 16aba42..f535d13 100644 --- a/pallas-upstream/Cargo.toml +++ b/pallas-upstream/Cargo.toml @@ -13,10 +13,7 @@ authors = ["Santiago Carmuega "] [dependencies] async-trait = "0.1.68" byteorder = "1.4.3" -gasket = { git = "https://github.com/construkts/gasket-rs" } -# gasket = { path = "../../../construkts/gasket-rs" } hex = "0.4.3" -# gasket = { version = "0.1.0", path = "../../../construkts/gasket-rs" } pallas-codec = { version = "0.18.0", path = "../pallas-codec" } pallas-crypto = { version = "0.18.0", path = "../pallas-crypto" } pallas-network = { version = "0.18.0", path = "../pallas-network" } @@ -26,5 +23,8 @@ thiserror = "1.0.31" tokio = { version = "1", features = ["net", "macros", "io-util"] } tracing = "0.1.37" +gasket = { git = "https://github.com/construkts/gasket-rs" } +# gasket = { path = "../../../construkts/gasket-rs" } + [dev-dependencies] tracing-subscriber = "0.3.16" diff --git a/pallas-upstream/src/worker.rs b/pallas-upstream/src/worker.rs index a050cdf..9eff3cd 100644 --- a/pallas-upstream/src/worker.rs +++ b/pallas-upstream/src/worker.rs @@ -1,4 +1,5 @@ use gasket::error::AsWorkError; +use gasket::messaging::SendAdapter; use tracing::{debug, info}; use pallas_network::facades::PeerClient; @@ -17,42 +18,43 @@ fn to_traverse(header: &HeaderContent) -> Result, gasket::err out.or_panic() } -pub type DownstreamPort = gasket::messaging::tokio::OutputPort; +pub type DownstreamPort = gasket::messaging::OutputPort; -pub struct Worker +pub struct Worker where C: Cursor, + A: SendAdapter, { peer_address: String, network_magic: u64, chain_cursor: C, peer_session: Option, - downstream: DownstreamPort, + downstream: DownstreamPort, block_count: gasket::metrics::Counter, chain_tip: gasket::metrics::Gauge, } -impl Worker +impl Worker where C: Cursor, + A: SendAdapter, { - pub fn new( - peer_address: String, - network_magic: u64, - chain_cursor: C, - downstream: DownstreamPort, - ) -> Self { + pub fn new(peer_address: String, network_magic: u64, chain_cursor: C) -> Self { Self { peer_address, network_magic, chain_cursor, - downstream, + downstream: Default::default(), peer_session: None, block_count: Default::default(), chain_tip: Default::default(), } } + pub fn downstream_port(&mut self) -> &mut DownstreamPort { + &mut self.downstream + } + fn notify_tip(&self, tip: &Tip) { self.chain_tip.set(tip.0.slot_or_default() as i64); } @@ -140,10 +142,11 @@ where } } -#[async_trait::async_trait] -impl gasket::runtime::Worker for Worker +#[async_trait::async_trait(?Send)] +impl gasket::runtime::Worker for Worker where C: Cursor + Sync + Send, + A: SendAdapter, { type WorkUnit = NextResponse; diff --git a/pallas-upstream/tests/integration.rs b/pallas-upstream/tests/integration.rs index 89734e0..6eb4f31 100644 --- a/pallas-upstream/tests/integration.rs +++ b/pallas-upstream/tests/integration.rs @@ -1,8 +1,5 @@ use gasket::{ - messaging::{ - tokio::{InputPort, OutputPort}, - RecvPort, SendPort, - }, + messaging::{tokio::InputPort, RecvPort, SendPort}, runtime::{WorkSchedule, Worker}, }; @@ -13,7 +10,7 @@ struct Witness { input: InputPort, } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl Worker for Witness { type WorkUnit = UpstreamEvent; @@ -54,16 +51,14 @@ fn test_mainnet_upstream() { let (send, receive) = gasket::messaging::tokio::channel(200); - let mut output_port = OutputPort::default(); - output_port.connect(send); - - let upstream = pallas_upstream::n2n::Worker::new( + let mut upstream = pallas_upstream::n2n::Worker::new( "relays-new.cardano-mainnet.iohk.io:3001".into(), 764824073, StaticCursor, - output_port, ); + upstream.downstream_port().connect(send); + let mut witness = Witness { input: Default::default(), };