fix: Make upstream worker easy to connect (#246)

This commit is contained in:
Santiago Carmuega 2023-04-11 14:33:45 +02:00 committed by GitHub
parent e46b152786
commit 1dc87174bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 26 deletions

View file

@ -13,10 +13,7 @@ authors = ["Santiago Carmuega <santiago@carmuega.me>"]
[dependencies]
async-trait = "0.1.68"
byteorder = "1.4.3"
gasket = { git = "https://github.com/construkts/gasket-rs" }
# gasket = { path = "../../../construkts/gasket-rs" }
hex = "0.4.3"
# gasket = { version = "0.1.0", path = "../../../construkts/gasket-rs" }
pallas-codec = { version = "0.18.0", path = "../pallas-codec" }
pallas-crypto = { version = "0.18.0", path = "../pallas-crypto" }
pallas-network = { version = "0.18.0", path = "../pallas-network" }
@ -26,5 +23,8 @@ thiserror = "1.0.31"
tokio = { version = "1", features = ["net", "macros", "io-util"] }
tracing = "0.1.37"
gasket = { git = "https://github.com/construkts/gasket-rs" }
# gasket = { path = "../../../construkts/gasket-rs" }
[dev-dependencies]
tracing-subscriber = "0.3.16"

View file

@ -1,4 +1,5 @@
use gasket::error::AsWorkError;
use gasket::messaging::SendAdapter;
use tracing::{debug, info};
use pallas_network::facades::PeerClient;
@ -17,42 +18,43 @@ fn to_traverse(header: &HeaderContent) -> Result<MultiEraHeader<'_>, gasket::err
out.or_panic()
}
pub type DownstreamPort = gasket::messaging::tokio::OutputPort<UpstreamEvent>;
pub type DownstreamPort<A> = gasket::messaging::OutputPort<A, UpstreamEvent>;
pub struct Worker<C>
pub struct Worker<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
peer_address: String,
network_magic: u64,
chain_cursor: C,
peer_session: Option<PeerClient>,
downstream: DownstreamPort,
downstream: DownstreamPort<A>,
block_count: gasket::metrics::Counter,
chain_tip: gasket::metrics::Gauge,
}
impl<C> Worker<C>
impl<C, A> Worker<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
pub fn new(
peer_address: String,
network_magic: u64,
chain_cursor: C,
downstream: DownstreamPort,
) -> Self {
pub fn new(peer_address: String, network_magic: u64, chain_cursor: C) -> Self {
Self {
peer_address,
network_magic,
chain_cursor,
downstream,
downstream: Default::default(),
peer_session: None,
block_count: Default::default(),
chain_tip: Default::default(),
}
}
pub fn downstream_port(&mut self) -> &mut DownstreamPort<A> {
&mut self.downstream
}
fn notify_tip(&self, tip: &Tip) {
self.chain_tip.set(tip.0.slot_or_default() as i64);
}
@ -140,10 +142,11 @@ where
}
}
#[async_trait::async_trait]
impl<C> gasket::runtime::Worker for Worker<C>
#[async_trait::async_trait(?Send)]
impl<C, A> gasket::runtime::Worker for Worker<C, A>
where
C: Cursor + Sync + Send,
A: SendAdapter<UpstreamEvent>,
{
type WorkUnit = NextResponse<HeaderContent>;

View file

@ -1,8 +1,5 @@
use gasket::{
messaging::{
tokio::{InputPort, OutputPort},
RecvPort, SendPort,
},
messaging::{tokio::InputPort, RecvPort, SendPort},
runtime::{WorkSchedule, Worker},
};
@ -13,7 +10,7 @@ struct Witness {
input: InputPort<UpstreamEvent>,
}
#[async_trait::async_trait]
#[async_trait::async_trait(?Send)]
impl Worker for Witness {
type WorkUnit = UpstreamEvent;
@ -54,16 +51,14 @@ fn test_mainnet_upstream() {
let (send, receive) = gasket::messaging::tokio::channel(200);
let mut output_port = OutputPort::default();
output_port.connect(send);
let upstream = pallas_upstream::n2n::Worker::new(
let mut upstream = pallas_upstream::n2n::Worker::new(
"relays-new.cardano-mainnet.iohk.io:3001".into(),
764824073,
StaticCursor,
output_port,
);
upstream.downstream_port().connect(send);
let mut witness = Witness {
input: Default::default(),
};