chore: upgrade to gasket v0.4 (#256)

This commit is contained in:
Santiago Carmuega 2023-05-03 20:43:42 +02:00 committed by GitHub
parent 6dc7f06964
commit ef9dbfad64
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 40 deletions

View file

@ -23,9 +23,9 @@ thiserror = "1.0.31"
tokio = { version = "1", features = ["net", "macros", "io-util"] }
tracing = "0.1.37"
gasket = "0.3.0"
gasket = "^0.4"
# gasket = { git = "https://github.com/construkts/gasket-rs" }
# gasket = { path = "../../../construkts/gasket-rs" }
# gasket = { path = "../../../construkts/gasket-rs/gasket" }
[dev-dependencies]
tracing-subscriber = "0.3.16"

View file

@ -119,15 +119,12 @@ where
}
#[async_trait::async_trait(?Send)]
impl<C, A> gasket::framework::Worker for Worker<C, A>
impl<C, A> gasket::framework::Worker<Stage<C, A>> for Worker<C, A>
where
C: Cursor + Sync + Send,
A: SendAdapter<UpstreamEvent>,
{
type Unit = NextResponse<HeaderContent>;
type Stage = Stage<C, A>;
async fn bootstrap(stage: &Self::Stage) -> Result<Self, WorkerError> {
async fn bootstrap(stage: &Stage<C, A>) -> Result<Self, WorkerError> {
debug!("connecting");
let intersection = stage.chain_cursor.intersection();
@ -149,8 +146,8 @@ where
async fn schedule(
&mut self,
_stage: &mut Self::Stage,
) -> Result<WorkSchedule<Self::Unit>, WorkerError> {
_stage: &mut Stage<C, A>,
) -> Result<WorkSchedule<NextResponse<HeaderContent>>, WorkerError> {
let client = self.peer_session.chainsync();
let next = match client.has_agency() {
@ -169,8 +166,8 @@ where
async fn execute(
&mut self,
unit: &Self::Unit,
stage: &mut Self::Stage,
unit: &NextResponse<HeaderContent>,
stage: &mut Stage<C, A>,
) -> Result<(), WorkerError> {
self.process_next(stage, unit).await
}
@ -190,7 +187,6 @@ where
peer_address: String,
network_magic: u64,
chain_cursor: C,
policy: gasket::runtime::Policy,
downstream: DownstreamPort<A>,
block_count: gasket::metrics::Counter,
chain_tip: gasket::metrics::Gauge,
@ -201,17 +197,20 @@ where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
type Unit = NextResponse<HeaderContent>;
type Worker = Worker<C, A>;
fn name(&self) -> &str {
"upstream"
}
fn policy(&self) -> gasket::runtime::Policy {
self.policy.clone()
}
fn metrics(&self) -> gasket::metrics::Registry {
let mut registry = gasket::metrics::Registry::default();
fn register_metrics(&self, registry: &mut gasket::metrics::Registry) {
registry.track_counter("received_blocks", &self.block_count);
registry.track_gauge("chain_tip", &self.chain_tip);
registry
}
}
@ -220,12 +219,7 @@ where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
pub fn new(
peer_address: String,
network_magic: u64,
chain_cursor: C,
policy: gasket::runtime::Policy,
) -> Self {
pub fn new(peer_address: String, network_magic: u64, chain_cursor: C) -> Self {
Self {
peer_address,
network_magic,
@ -233,7 +227,6 @@ where
downstream: Default::default(),
block_count: Default::default(),
chain_tip: Default::default(),
policy,
}
}

View file

@ -8,38 +8,36 @@ struct WitnessStage {
}
impl gasket::framework::Stage for WitnessStage {
type Unit = UpstreamEvent;
type Worker = WitnessWorker;
fn name(&self) -> &str {
"witness"
}
fn policy(&self) -> gasket::runtime::Policy {
Policy::default()
}
fn register_metrics(&self, _: &mut gasket::metrics::Registry) {}
}
struct WitnessWorker;
#[async_trait::async_trait(?Send)]
impl Worker for WitnessWorker {
type Unit = UpstreamEvent;
type Stage = WitnessStage;
async fn bootstrap(_: &Self::Stage) -> Result<Self, WorkerError> {
impl Worker<WitnessStage> for WitnessWorker {
async fn bootstrap(_: &WitnessStage) -> Result<Self, WorkerError> {
Ok(Self)
}
async fn schedule(
&mut self,
stage: &mut Self::Stage,
) -> Result<WorkSchedule<Self::Unit>, WorkerError> {
stage: &mut WitnessStage,
) -> Result<WorkSchedule<UpstreamEvent>, WorkerError> {
error!("dequeing form witness");
let msg = stage.input.recv().await.or_panic()?;
Ok(WorkSchedule::Unit(msg.payload))
}
async fn execute(&mut self, _: &Self::Unit, _: &mut Self::Stage) -> Result<(), WorkerError> {
async fn execute(
&mut self,
_: &UpstreamEvent,
_: &mut WitnessStage,
) -> Result<(), WorkerError> {
info!("witnessing block event");
Ok(())
@ -70,7 +68,6 @@ fn test_mainnet_upstream() {
"relays-new.cardano-mainnet.iohk.io:3001".into(),
764824073,
StaticCursor,
Policy::default(),
);
upstream.downstream_port().connect(send);
@ -81,8 +78,8 @@ fn test_mainnet_upstream() {
witness.input.connect(receive);
let upstream = gasket::runtime::spawn_stage::<pallas_upstream::n2n::Worker<_, _>>(upstream);
let witness = gasket::runtime::spawn_stage::<WitnessWorker>(witness);
let upstream = gasket::runtime::spawn_stage(upstream, Policy::default());
let witness = gasket::runtime::spawn_stage(witness, Policy::default());
let daemon = gasket::daemon::Daemon(vec![upstream, witness]);