diff --git a/pallas-upstream/Cargo.toml b/pallas-upstream/Cargo.toml index b26a63f..d95abb4 100644 --- a/pallas-upstream/Cargo.toml +++ b/pallas-upstream/Cargo.toml @@ -23,7 +23,7 @@ thiserror = "1.0.31" tokio = { version = "1", features = ["net", "macros", "io-util"] } tracing = "0.1.37" -gasket = "0.2.0" +gasket = "0.3.0" # gasket = { git = "https://github.com/construkts/gasket-rs" } # gasket = { path = "../../../construkts/gasket-rs" } diff --git a/pallas-upstream/src/lib.rs b/pallas-upstream/src/lib.rs index bf7234b..e2d9457 100644 --- a/pallas-upstream/src/lib.rs +++ b/pallas-upstream/src/lib.rs @@ -4,5 +4,5 @@ pub(crate) mod worker; pub use crate::framework::{Cursor, DownstreamPort, Intersection, UpstreamEvent}; pub mod n2n { - pub use crate::worker::Worker; + pub use crate::worker::*; } diff --git a/pallas-upstream/src/worker.rs b/pallas-upstream/src/worker.rs index 9eff3cd..f9ff96c 100644 --- a/pallas-upstream/src/worker.rs +++ b/pallas-upstream/src/worker.rs @@ -1,15 +1,17 @@ -use gasket::error::AsWorkError; -use gasket::messaging::SendAdapter; +use std::marker::PhantomData; + +use gasket::framework::*; +use gasket::messaging::*; use tracing::{debug, info}; use pallas_network::facades::PeerClient; -use pallas_network::miniprotocols::chainsync::{self, HeaderContent, NextResponse, Tip}; +use pallas_network::miniprotocols::chainsync::{self, HeaderContent, NextResponse}; use pallas_network::miniprotocols::Point; use pallas_traverse::MultiEraHeader; use crate::framework::*; -fn to_traverse(header: &HeaderContent) -> Result, gasket::error::Error> { +fn to_traverse(header: &HeaderContent) -> Result, WorkerError> { let out = match header.byron_prefix { Some((subtag, _)) => MultiEraHeader::decode(header.variant, Some(subtag), &header.cbor), None => MultiEraHeader::decode(header.variant, None, &header.cbor), @@ -20,18 +22,38 @@ fn to_traverse(header: &HeaderContent) -> Result, gasket::err pub type DownstreamPort = gasket::messaging::OutputPort; +async fn intersect(peer: &mut PeerClient, intersection: Intersection) -> Result<(), WorkerError> { + let chainsync = peer.chainsync(); + + let intersect = match intersection { + Intersection::Origin => { + info!("intersecting origin"); + chainsync.intersect_origin().await.or_restart()?.into() + } + Intersection::Tip => { + info!("intersecting tip"); + chainsync.intersect_tip().await.or_restart()?.into() + } + Intersection::Breadcrumbs(points) => { + info!("intersecting breadcrumbs"); + let (point, _) = chainsync.find_intersect(points).await.or_restart()?; + point + } + }; + + info!(?intersect, "intersected"); + + Ok(()) +} + pub struct Worker where C: Cursor, A: SendAdapter, { - peer_address: String, - network_magic: u64, - chain_cursor: C, - peer_session: Option, - downstream: DownstreamPort, - block_count: gasket::metrics::Counter, - chain_tip: gasket::metrics::Gauge, + peer_session: PeerClient, + _panthom_c: PhantomData, + _panthom_a: PhantomData, } impl Worker @@ -39,59 +61,11 @@ where C: Cursor, A: SendAdapter, { - pub fn new(peer_address: String, network_magic: u64, chain_cursor: C) -> Self { - Self { - peer_address, - network_magic, - chain_cursor, - downstream: Default::default(), - peer_session: None, - block_count: Default::default(), - chain_tip: Default::default(), - } - } - - pub fn downstream_port(&mut self) -> &mut DownstreamPort { - &mut self.downstream - } - - fn notify_tip(&self, tip: &Tip) { - self.chain_tip.set(tip.0.slot_or_default() as i64); - } - - async fn intersect(&mut self) -> Result<(), gasket::error::Error> { - let value = self.chain_cursor.intersection(); - - let chainsync = self.peer_session.as_mut().unwrap().chainsync(); - - let intersect = match value { - Intersection::Origin => { - info!("intersecting origin"); - chainsync.intersect_origin().await.or_restart()?.into() - } - Intersection::Tip => { - info!("intersecting tip"); - chainsync.intersect_tip().await.or_restart()?.into() - } - Intersection::Breadcrumbs(points) => { - info!("intersecting breadcrumbs"); - let (point, tip) = chainsync.find_intersect(points).await.or_restart()?; - - self.notify_tip(&tip); - - point - } - }; - - info!(?intersect, "intersected"); - - Ok(()) - } - async fn process_next( &mut self, + stage: &mut Stage, next: &NextResponse, - ) -> Result<(), gasket::error::Error> { + ) -> Result<(), WorkerError> { match next { NextResponse::RollForward(header, tip) => { let header = to_traverse(header).or_panic()?; @@ -102,8 +76,6 @@ where let block = self .peer_session - .as_mut() - .unwrap() .blockfetch() .fetch_single(pallas_network::miniprotocols::Point::Specific( slot, @@ -112,11 +84,13 @@ where .await .or_retry()?; - self.downstream + stage + .downstream .send(UpstreamEvent::RollForward(slot, hash, block).into()) - .await?; + .await + .or_panic()?; - self.notify_tip(tip); + stage.chain_tip.set(tip.0.slot_or_default() as i64); Ok(()) } @@ -126,11 +100,13 @@ where Point::Specific(slot, _) => debug!(slot, "rollback"), }; - self.downstream + stage + .downstream .send(UpstreamEvent::Rollback(point.clone()).into()) - .await?; + .await + .or_panic()?; - self.notify_tip(tip); + stage.chain_tip.set(tip.0.slot_or_default() as i64); Ok(()) } @@ -143,42 +119,39 @@ where } #[async_trait::async_trait(?Send)] -impl gasket::runtime::Worker for Worker +impl gasket::framework::Worker for Worker where C: Cursor + Sync + Send, A: SendAdapter, { - type WorkUnit = NextResponse; + type Unit = NextResponse; + type Stage = Stage; - fn metrics(&self) -> gasket::metrics::Registry { - gasket::metrics::Builder::new() - .with_counter("received_blocks", &self.block_count) - .with_gauge("chain_tip", &self.chain_tip) - .build() - } - - async fn bootstrap(&mut self) -> Result<(), gasket::error::Error> { + async fn bootstrap(stage: &Self::Stage) -> Result { debug!("connecting"); - let peer = PeerClient::connect(&self.peer_address, self.network_magic) + let intersection = stage.chain_cursor.intersection(); + + let mut peer_session = PeerClient::connect(&stage.peer_address, stage.network_magic) .await - .or_restart()?; + .or_retry()?; - self.peer_session = Some(peer); + intersect(&mut peer_session, intersection).await?; - self.intersect().await?; + let worker = Self { + peer_session, + _panthom_a: Default::default(), + _panthom_c: Default::default(), + }; - Ok(()) + Ok(worker) } - async fn teardown(&mut self) -> Result<(), gasket::error::Error> { - self.peer_session.as_mut().unwrap().abort(); - - Ok(()) - } - - async fn schedule(&mut self) -> gasket::runtime::ScheduleResult { - let client = self.peer_session.as_mut().unwrap().chainsync(); + async fn schedule( + &mut self, + _stage: &mut Self::Stage, + ) -> Result, WorkerError> { + let client = self.peer_session.chainsync(); let next = match client.has_agency() { true => { @@ -191,10 +164,80 @@ where } }; - Ok(gasket::runtime::WorkSchedule::Unit(next)) + Ok(WorkSchedule::Unit(next)) } - async fn execute(&mut self, unit: &Self::WorkUnit) -> Result<(), gasket::error::Error> { - self.process_next(unit).await + async fn execute( + &mut self, + unit: &Self::Unit, + stage: &mut Self::Stage, + ) -> Result<(), WorkerError> { + self.process_next(stage, unit).await + } + + async fn teardown(&mut self) -> Result<(), WorkerError> { + self.peer_session.abort(); + + Ok(()) + } +} + +pub struct Stage +where + C: Cursor, + A: SendAdapter, +{ + peer_address: String, + network_magic: u64, + chain_cursor: C, + policy: gasket::runtime::Policy, + downstream: DownstreamPort, + block_count: gasket::metrics::Counter, + chain_tip: gasket::metrics::Gauge, +} + +impl gasket::framework::Stage for Stage +where + C: Cursor, + A: SendAdapter, +{ + fn name(&self) -> &str { + "upstream" + } + + fn policy(&self) -> gasket::runtime::Policy { + self.policy.clone() + } + + fn register_metrics(&self, registry: &mut gasket::metrics::Registry) { + registry.track_counter("received_blocks", &self.block_count); + registry.track_gauge("chain_tip", &self.chain_tip); + } +} + +impl Stage +where + C: Cursor, + A: SendAdapter, +{ + pub fn new( + peer_address: String, + network_magic: u64, + chain_cursor: C, + policy: gasket::runtime::Policy, + ) -> Self { + Self { + peer_address, + network_magic, + chain_cursor, + downstream: Default::default(), + block_count: Default::default(), + chain_tip: Default::default(), + policy, + } + } + + pub fn downstream_port(&mut self) -> &mut DownstreamPort { + &mut self.downstream } } diff --git a/pallas-upstream/tests/integration.rs b/pallas-upstream/tests/integration.rs index 6eb4f31..0b2f619 100644 --- a/pallas-upstream/tests/integration.rs +++ b/pallas-upstream/tests/integration.rs @@ -1,31 +1,46 @@ -use gasket::{ - messaging::{tokio::InputPort, RecvPort, SendPort}, - runtime::{WorkSchedule, Worker}, -}; +use gasket::{framework::*, messaging::*, runtime::Policy}; use pallas_upstream::{Cursor, UpstreamEvent}; -use tracing::error; +use tracing::{error, info}; -struct Witness { - input: InputPort, +struct WitnessStage { + input: gasket::messaging::tokio::InputPort, } -#[async_trait::async_trait(?Send)] -impl Worker for Witness { - type WorkUnit = UpstreamEvent; - - fn metrics(&self) -> gasket::metrics::Registry { - gasket::metrics::Registry::new() +impl gasket::framework::Stage for WitnessStage { + fn name(&self) -> &str { + "witness" } - async fn schedule(&mut self) -> gasket::runtime::ScheduleResult { + fn policy(&self) -> gasket::runtime::Policy { + Policy::default() + } + + fn register_metrics(&self, _: &mut gasket::metrics::Registry) {} +} + +struct WitnessWorker; + +#[async_trait::async_trait(?Send)] +impl Worker for WitnessWorker { + type Unit = UpstreamEvent; + type Stage = WitnessStage; + + async fn bootstrap(_: &Self::Stage) -> Result { + Ok(Self) + } + + async fn schedule( + &mut self, + stage: &mut Self::Stage, + ) -> Result, WorkerError> { error!("dequeing form witness"); - let msg = self.input.recv().await?; + let msg = stage.input.recv().await.or_panic()?; Ok(WorkSchedule::Unit(msg.payload)) } - async fn execute(&mut self, _: &Self::WorkUnit) -> Result<(), gasket::error::Error> { - error!("witnessing block event"); + async fn execute(&mut self, _: &Self::Unit, _: &mut Self::Stage) -> Result<(), WorkerError> { + info!("witnessing block event"); Ok(()) } @@ -51,22 +66,23 @@ fn test_mainnet_upstream() { let (send, receive) = gasket::messaging::tokio::channel(200); - let mut upstream = pallas_upstream::n2n::Worker::new( + let mut upstream = pallas_upstream::n2n::Stage::new( "relays-new.cardano-mainnet.iohk.io:3001".into(), 764824073, StaticCursor, + Policy::default(), ); upstream.downstream_port().connect(send); - let mut witness = Witness { + let mut witness = WitnessStage { input: Default::default(), }; witness.input.connect(receive); - let upstream = gasket::runtime::spawn_stage(upstream, Default::default(), Some("upstream")); - let witness = gasket::runtime::spawn_stage(witness, Default::default(), Some("witness")); + let upstream = gasket::runtime::spawn_stage::>(upstream); + let witness = gasket::runtime::spawn_stage::(witness); let daemon = gasket::daemon::Daemon(vec![upstream, witness]);