diff --git a/pallas-upstream/Cargo.toml b/pallas-upstream/Cargo.toml index d95abb4..394550d 100644 --- a/pallas-upstream/Cargo.toml +++ b/pallas-upstream/Cargo.toml @@ -23,9 +23,9 @@ thiserror = "1.0.31" tokio = { version = "1", features = ["net", "macros", "io-util"] } tracing = "0.1.37" -gasket = "0.3.0" +gasket = "^0.4" # gasket = { git = "https://github.com/construkts/gasket-rs" } -# gasket = { path = "../../../construkts/gasket-rs" } +# gasket = { path = "../../../construkts/gasket-rs/gasket" } [dev-dependencies] tracing-subscriber = "0.3.16" diff --git a/pallas-upstream/src/worker.rs b/pallas-upstream/src/worker.rs index f9ff96c..7f2b8f8 100644 --- a/pallas-upstream/src/worker.rs +++ b/pallas-upstream/src/worker.rs @@ -119,15 +119,12 @@ where } #[async_trait::async_trait(?Send)] -impl gasket::framework::Worker for Worker +impl gasket::framework::Worker> for Worker where C: Cursor + Sync + Send, A: SendAdapter, { - type Unit = NextResponse; - type Stage = Stage; - - async fn bootstrap(stage: &Self::Stage) -> Result { + async fn bootstrap(stage: &Stage) -> Result { debug!("connecting"); let intersection = stage.chain_cursor.intersection(); @@ -149,8 +146,8 @@ where async fn schedule( &mut self, - _stage: &mut Self::Stage, - ) -> Result, WorkerError> { + _stage: &mut Stage, + ) -> Result>, WorkerError> { let client = self.peer_session.chainsync(); let next = match client.has_agency() { @@ -169,8 +166,8 @@ where async fn execute( &mut self, - unit: &Self::Unit, - stage: &mut Self::Stage, + unit: &NextResponse, + stage: &mut Stage, ) -> Result<(), WorkerError> { self.process_next(stage, unit).await } @@ -190,7 +187,6 @@ where peer_address: String, network_magic: u64, chain_cursor: C, - policy: gasket::runtime::Policy, downstream: DownstreamPort, block_count: gasket::metrics::Counter, chain_tip: gasket::metrics::Gauge, @@ -201,17 +197,20 @@ where C: Cursor, A: SendAdapter, { + type Unit = NextResponse; + type Worker = Worker; + fn name(&self) -> &str { "upstream" } - fn policy(&self) -> gasket::runtime::Policy { - self.policy.clone() - } + fn metrics(&self) -> gasket::metrics::Registry { + let mut registry = gasket::metrics::Registry::default(); - fn register_metrics(&self, registry: &mut gasket::metrics::Registry) { registry.track_counter("received_blocks", &self.block_count); registry.track_gauge("chain_tip", &self.chain_tip); + + registry } } @@ -220,12 +219,7 @@ where C: Cursor, A: SendAdapter, { - pub fn new( - peer_address: String, - network_magic: u64, - chain_cursor: C, - policy: gasket::runtime::Policy, - ) -> Self { + pub fn new(peer_address: String, network_magic: u64, chain_cursor: C) -> Self { Self { peer_address, network_magic, @@ -233,7 +227,6 @@ where downstream: Default::default(), block_count: Default::default(), chain_tip: Default::default(), - policy, } } diff --git a/pallas-upstream/tests/integration.rs b/pallas-upstream/tests/integration.rs index 0b2f619..3429ee2 100644 --- a/pallas-upstream/tests/integration.rs +++ b/pallas-upstream/tests/integration.rs @@ -8,38 +8,36 @@ struct WitnessStage { } impl gasket::framework::Stage for WitnessStage { + type Unit = UpstreamEvent; + type Worker = WitnessWorker; + fn name(&self) -> &str { "witness" } - - fn policy(&self) -> gasket::runtime::Policy { - Policy::default() - } - - fn register_metrics(&self, _: &mut gasket::metrics::Registry) {} } struct WitnessWorker; #[async_trait::async_trait(?Send)] -impl Worker for WitnessWorker { - type Unit = UpstreamEvent; - type Stage = WitnessStage; - - async fn bootstrap(_: &Self::Stage) -> Result { +impl Worker for WitnessWorker { + async fn bootstrap(_: &WitnessStage) -> Result { Ok(Self) } async fn schedule( &mut self, - stage: &mut Self::Stage, - ) -> Result, WorkerError> { + stage: &mut WitnessStage, + ) -> Result, WorkerError> { error!("dequeing form witness"); let msg = stage.input.recv().await.or_panic()?; Ok(WorkSchedule::Unit(msg.payload)) } - async fn execute(&mut self, _: &Self::Unit, _: &mut Self::Stage) -> Result<(), WorkerError> { + async fn execute( + &mut self, + _: &UpstreamEvent, + _: &mut WitnessStage, + ) -> Result<(), WorkerError> { info!("witnessing block event"); Ok(()) @@ -70,7 +68,6 @@ fn test_mainnet_upstream() { "relays-new.cardano-mainnet.iohk.io:3001".into(), 764824073, StaticCursor, - Policy::default(), ); upstream.downstream_port().connect(send); @@ -81,8 +78,8 @@ fn test_mainnet_upstream() { witness.input.connect(receive); - let upstream = gasket::runtime::spawn_stage::>(upstream); - let witness = gasket::runtime::spawn_stage::(witness); + let upstream = gasket::runtime::spawn_stage(upstream, Policy::default()); + let witness = gasket::runtime::spawn_stage(witness, Policy::default()); let daemon = gasket::daemon::Daemon(vec![upstream, witness]);