chore: upgrade gasket to v0.3.0 (#255)

This commit is contained in:
Santiago Carmuega 2023-04-28 02:52:34 +02:00 committed by GitHub
parent c9119e4925
commit 6dc7f06964
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 177 additions and 118 deletions

View file

@ -23,7 +23,7 @@ thiserror = "1.0.31"
tokio = { version = "1", features = ["net", "macros", "io-util"] }
tracing = "0.1.37"
gasket = "0.2.0"
gasket = "0.3.0"
# gasket = { git = "https://github.com/construkts/gasket-rs" }
# gasket = { path = "../../../construkts/gasket-rs" }

View file

@ -4,5 +4,5 @@ pub(crate) mod worker;
pub use crate::framework::{Cursor, DownstreamPort, Intersection, UpstreamEvent};
pub mod n2n {
pub use crate::worker::Worker;
pub use crate::worker::*;
}

View file

@ -1,15 +1,17 @@
use gasket::error::AsWorkError;
use gasket::messaging::SendAdapter;
use std::marker::PhantomData;
use gasket::framework::*;
use gasket::messaging::*;
use tracing::{debug, info};
use pallas_network::facades::PeerClient;
use pallas_network::miniprotocols::chainsync::{self, HeaderContent, NextResponse, Tip};
use pallas_network::miniprotocols::chainsync::{self, HeaderContent, NextResponse};
use pallas_network::miniprotocols::Point;
use pallas_traverse::MultiEraHeader;
use crate::framework::*;
fn to_traverse(header: &HeaderContent) -> Result<MultiEraHeader<'_>, gasket::error::Error> {
fn to_traverse(header: &HeaderContent) -> Result<MultiEraHeader<'_>, WorkerError> {
let out = match header.byron_prefix {
Some((subtag, _)) => MultiEraHeader::decode(header.variant, Some(subtag), &header.cbor),
None => MultiEraHeader::decode(header.variant, None, &header.cbor),
@ -20,18 +22,38 @@ fn to_traverse(header: &HeaderContent) -> Result<MultiEraHeader<'_>, gasket::err
pub type DownstreamPort<A> = gasket::messaging::OutputPort<A, UpstreamEvent>;
async fn intersect(peer: &mut PeerClient, intersection: Intersection) -> Result<(), WorkerError> {
let chainsync = peer.chainsync();
let intersect = match intersection {
Intersection::Origin => {
info!("intersecting origin");
chainsync.intersect_origin().await.or_restart()?.into()
}
Intersection::Tip => {
info!("intersecting tip");
chainsync.intersect_tip().await.or_restart()?.into()
}
Intersection::Breadcrumbs(points) => {
info!("intersecting breadcrumbs");
let (point, _) = chainsync.find_intersect(points).await.or_restart()?;
point
}
};
info!(?intersect, "intersected");
Ok(())
}
pub struct Worker<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
peer_address: String,
network_magic: u64,
chain_cursor: C,
peer_session: Option<PeerClient>,
downstream: DownstreamPort<A>,
block_count: gasket::metrics::Counter,
chain_tip: gasket::metrics::Gauge,
peer_session: PeerClient,
_panthom_c: PhantomData<C>,
_panthom_a: PhantomData<A>,
}
impl<C, A> Worker<C, A>
@ -39,59 +61,11 @@ where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
pub fn new(peer_address: String, network_magic: u64, chain_cursor: C) -> Self {
Self {
peer_address,
network_magic,
chain_cursor,
downstream: Default::default(),
peer_session: None,
block_count: Default::default(),
chain_tip: Default::default(),
}
}
pub fn downstream_port(&mut self) -> &mut DownstreamPort<A> {
&mut self.downstream
}
fn notify_tip(&self, tip: &Tip) {
self.chain_tip.set(tip.0.slot_or_default() as i64);
}
async fn intersect(&mut self) -> Result<(), gasket::error::Error> {
let value = self.chain_cursor.intersection();
let chainsync = self.peer_session.as_mut().unwrap().chainsync();
let intersect = match value {
Intersection::Origin => {
info!("intersecting origin");
chainsync.intersect_origin().await.or_restart()?.into()
}
Intersection::Tip => {
info!("intersecting tip");
chainsync.intersect_tip().await.or_restart()?.into()
}
Intersection::Breadcrumbs(points) => {
info!("intersecting breadcrumbs");
let (point, tip) = chainsync.find_intersect(points).await.or_restart()?;
self.notify_tip(&tip);
point
}
};
info!(?intersect, "intersected");
Ok(())
}
async fn process_next(
&mut self,
stage: &mut Stage<C, A>,
next: &NextResponse<HeaderContent>,
) -> Result<(), gasket::error::Error> {
) -> Result<(), WorkerError> {
match next {
NextResponse::RollForward(header, tip) => {
let header = to_traverse(header).or_panic()?;
@ -102,8 +76,6 @@ where
let block = self
.peer_session
.as_mut()
.unwrap()
.blockfetch()
.fetch_single(pallas_network::miniprotocols::Point::Specific(
slot,
@ -112,11 +84,13 @@ where
.await
.or_retry()?;
self.downstream
stage
.downstream
.send(UpstreamEvent::RollForward(slot, hash, block).into())
.await?;
.await
.or_panic()?;
self.notify_tip(tip);
stage.chain_tip.set(tip.0.slot_or_default() as i64);
Ok(())
}
@ -126,11 +100,13 @@ where
Point::Specific(slot, _) => debug!(slot, "rollback"),
};
self.downstream
stage
.downstream
.send(UpstreamEvent::Rollback(point.clone()).into())
.await?;
.await
.or_panic()?;
self.notify_tip(tip);
stage.chain_tip.set(tip.0.slot_or_default() as i64);
Ok(())
}
@ -143,42 +119,39 @@ where
}
#[async_trait::async_trait(?Send)]
impl<C, A> gasket::runtime::Worker for Worker<C, A>
impl<C, A> gasket::framework::Worker for Worker<C, A>
where
C: Cursor + Sync + Send,
A: SendAdapter<UpstreamEvent>,
{
type WorkUnit = NextResponse<HeaderContent>;
type Unit = NextResponse<HeaderContent>;
type Stage = Stage<C, A>;
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Builder::new()
.with_counter("received_blocks", &self.block_count)
.with_gauge("chain_tip", &self.chain_tip)
.build()
}
async fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
async fn bootstrap(stage: &Self::Stage) -> Result<Self, WorkerError> {
debug!("connecting");
let peer = PeerClient::connect(&self.peer_address, self.network_magic)
let intersection = stage.chain_cursor.intersection();
let mut peer_session = PeerClient::connect(&stage.peer_address, stage.network_magic)
.await
.or_restart()?;
.or_retry()?;
self.peer_session = Some(peer);
intersect(&mut peer_session, intersection).await?;
self.intersect().await?;
let worker = Self {
peer_session,
_panthom_a: Default::default(),
_panthom_c: Default::default(),
};
Ok(())
Ok(worker)
}
async fn teardown(&mut self) -> Result<(), gasket::error::Error> {
self.peer_session.as_mut().unwrap().abort();
Ok(())
}
async fn schedule(&mut self) -> gasket::runtime::ScheduleResult<Self::WorkUnit> {
let client = self.peer_session.as_mut().unwrap().chainsync();
async fn schedule(
&mut self,
_stage: &mut Self::Stage,
) -> Result<WorkSchedule<Self::Unit>, WorkerError> {
let client = self.peer_session.chainsync();
let next = match client.has_agency() {
true => {
@ -191,10 +164,80 @@ where
}
};
Ok(gasket::runtime::WorkSchedule::Unit(next))
Ok(WorkSchedule::Unit(next))
}
async fn execute(&mut self, unit: &Self::WorkUnit) -> Result<(), gasket::error::Error> {
self.process_next(unit).await
async fn execute(
&mut self,
unit: &Self::Unit,
stage: &mut Self::Stage,
) -> Result<(), WorkerError> {
self.process_next(stage, unit).await
}
async fn teardown(&mut self) -> Result<(), WorkerError> {
self.peer_session.abort();
Ok(())
}
}
pub struct Stage<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
peer_address: String,
network_magic: u64,
chain_cursor: C,
policy: gasket::runtime::Policy,
downstream: DownstreamPort<A>,
block_count: gasket::metrics::Counter,
chain_tip: gasket::metrics::Gauge,
}
impl<C, A> gasket::framework::Stage for Stage<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
fn name(&self) -> &str {
"upstream"
}
fn policy(&self) -> gasket::runtime::Policy {
self.policy.clone()
}
fn register_metrics(&self, registry: &mut gasket::metrics::Registry) {
registry.track_counter("received_blocks", &self.block_count);
registry.track_gauge("chain_tip", &self.chain_tip);
}
}
impl<C, A> Stage<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
pub fn new(
peer_address: String,
network_magic: u64,
chain_cursor: C,
policy: gasket::runtime::Policy,
) -> Self {
Self {
peer_address,
network_magic,
chain_cursor,
downstream: Default::default(),
block_count: Default::default(),
chain_tip: Default::default(),
policy,
}
}
pub fn downstream_port(&mut self) -> &mut DownstreamPort<A> {
&mut self.downstream
}
}

View file

@ -1,31 +1,46 @@
use gasket::{
messaging::{tokio::InputPort, RecvPort, SendPort},
runtime::{WorkSchedule, Worker},
};
use gasket::{framework::*, messaging::*, runtime::Policy};
use pallas_upstream::{Cursor, UpstreamEvent};
use tracing::error;
use tracing::{error, info};
struct Witness {
input: InputPort<UpstreamEvent>,
struct WitnessStage {
input: gasket::messaging::tokio::InputPort<UpstreamEvent>,
}
#[async_trait::async_trait(?Send)]
impl Worker for Witness {
type WorkUnit = UpstreamEvent;
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Registry::new()
impl gasket::framework::Stage for WitnessStage {
fn name(&self) -> &str {
"witness"
}
async fn schedule(&mut self) -> gasket::runtime::ScheduleResult<Self::WorkUnit> {
fn policy(&self) -> gasket::runtime::Policy {
Policy::default()
}
fn register_metrics(&self, _: &mut gasket::metrics::Registry) {}
}
struct WitnessWorker;
#[async_trait::async_trait(?Send)]
impl Worker for WitnessWorker {
type Unit = UpstreamEvent;
type Stage = WitnessStage;
async fn bootstrap(_: &Self::Stage) -> Result<Self, WorkerError> {
Ok(Self)
}
async fn schedule(
&mut self,
stage: &mut Self::Stage,
) -> Result<WorkSchedule<Self::Unit>, WorkerError> {
error!("dequeing form witness");
let msg = self.input.recv().await?;
let msg = stage.input.recv().await.or_panic()?;
Ok(WorkSchedule::Unit(msg.payload))
}
async fn execute(&mut self, _: &Self::WorkUnit) -> Result<(), gasket::error::Error> {
error!("witnessing block event");
async fn execute(&mut self, _: &Self::Unit, _: &mut Self::Stage) -> Result<(), WorkerError> {
info!("witnessing block event");
Ok(())
}
@ -51,22 +66,23 @@ fn test_mainnet_upstream() {
let (send, receive) = gasket::messaging::tokio::channel(200);
let mut upstream = pallas_upstream::n2n::Worker::new(
let mut upstream = pallas_upstream::n2n::Stage::new(
"relays-new.cardano-mainnet.iohk.io:3001".into(),
764824073,
StaticCursor,
Policy::default(),
);
upstream.downstream_port().connect(send);
let mut witness = Witness {
let mut witness = WitnessStage {
input: Default::default(),
};
witness.input.connect(receive);
let upstream = gasket::runtime::spawn_stage(upstream, Default::default(), Some("upstream"));
let witness = gasket::runtime::spawn_stage(witness, Default::default(), Some("witness"));
let upstream = gasket::runtime::spawn_stage::<pallas_upstream::n2n::Worker<_, _>>(upstream);
let witness = gasket::runtime::spawn_stage::<WitnessWorker>(witness);
let daemon = gasket::daemon::Daemon(vec![upstream, witness]);