feat(upstream): Make output generic by adapter (#236)

This commit is contained in:
Santiago Carmuega 2023-03-16 20:21:06 +01:00 committed by GitHub
parent 61f50b2cca
commit 2243acfff9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 63 additions and 42 deletions

View file

@ -7,13 +7,10 @@ use crate::MultiEraTx;
impl<'b> MultiEraTx<'b> {
pub fn aux_plutus_v1_scripts(&self) -> &[alonzo::PlutusScript] {
if let Some(aux_data) = self.aux_data() {
match aux_data.deref() {
alonzo::AuxiliaryData::PostAlonzo(x) => {
if let Some(plutus) = &x.plutus_scripts {
return plutus.as_ref();
}
if let alonzo::AuxiliaryData::PostAlonzo(x) = aux_data.deref() {
if let Some(plutus) = &x.plutus_scripts {
return plutus.as_ref();
}
_ => (),
}
}

View file

@ -1,4 +1,4 @@
use std::{borrow::Cow, ops::Deref};
use std::borrow::Cow;
use pallas_codec::minicbor;
use pallas_crypto::hash::Hash;

View file

@ -11,7 +11,8 @@ readme = "README.md"
authors = ["Santiago Carmuega <santiago@carmuega.me>"]
[dependencies]
gasket = { git = "https://github.com/construkts/gasket-rs", version = "0.1.0" }
gasket = { git = "https://github.com/construkts/gasket-rs" }
# gasket = { version = "0.1.0", path = "../../../construkts/gasket-rs" }
pallas-codec = { version = "0.18.0", path = "../pallas-codec" }
pallas-crypto = { version = "0.18.0", path = "../pallas-crypto" }
pallas-miniprotocols = { version = "0.18.0", path = "../pallas-miniprotocols" }

View file

@ -1,8 +1,15 @@
pub use crate::cursor;
pub use crate::framework::BlockFetchEvent;
pub use crate::framework::DownstreamPort;
pub mod n2n {
use crate::{blockfetch, chainsync, cursor::Cursor, framework::*, plexer};
use gasket::runtime::Tether;
use gasket::{
messaging::{SendAdapter, SendPort},
runtime::Tether,
};
pub struct Runtime {
pub plexer_tether: Tether,
@ -10,25 +17,31 @@ pub mod n2n {
pub blockfetch_tether: Tether,
}
pub struct Bootstrapper {
pub struct Bootstrapper<A>
where
A: SendAdapter<BlockFetchEvent>,
{
cursor: Cursor,
peer_address: String,
network_magic: u64,
output: blockfetch::DownstreamPort,
output: super::DownstreamPort<A>,
}
impl Bootstrapper {
impl<A> Bootstrapper<A>
where
A: SendAdapter<BlockFetchEvent> + 'static,
{
pub fn new(cursor: Cursor, peer_address: String, network_magic: u64) -> Self {
Bootstrapper {
cursor,
peer_address,
network_magic,
output: blockfetch::DownstreamPort::default(),
output: Default::default(),
}
}
pub fn borrow_output_port(&mut self) -> &mut blockfetch::DownstreamPort {
&mut self.output
pub fn connect_output(&mut self, adapter: A) {
self.output.connect(adapter);
}
pub fn spawn(self) -> Result<Runtime, Error> {
@ -54,15 +67,15 @@ pub mod n2n {
let mut demux2_out = DemuxOutputPort::default();
let mut demux2_in = DemuxInputPort::default();
gasket::messaging::connect_ports(&mut demux2_out, &mut demux2_in, 1000);
gasket::messaging::crossbeam::connect_ports(&mut demux2_out, &mut demux2_in, 1000);
let mut demux3_out = DemuxOutputPort::default();
let mut demux3_in = DemuxInputPort::default();
gasket::messaging::connect_ports(&mut demux3_out, &mut demux3_in, 1000);
gasket::messaging::crossbeam::connect_ports(&mut demux3_out, &mut demux3_in, 1000);
let mut mux2_out = MuxOutputPort::default();
let mut mux3_out = MuxOutputPort::default();
gasket::messaging::funnel_ports(
gasket::messaging::crossbeam::funnel_ports(
vec![&mut mux2_out, &mut mux3_out],
&mut mux_input,
1000,
@ -70,7 +83,7 @@ pub mod n2n {
let mut chainsync_downstream = chainsync::DownstreamPort::default();
let mut blockfetch_upstream = blockfetch::UpstreamPort::default();
gasket::messaging::connect_ports(
gasket::messaging::crossbeam::connect_ports(
&mut chainsync_downstream,
&mut blockfetch_upstream,
20,

View file

@ -1,3 +1,4 @@
use gasket::messaging::SendAdapter;
use tracing::{debug, error, instrument};
use pallas_crypto::hash::Hash;
@ -6,23 +7,27 @@ use pallas_miniprotocols::Point;
use crate::framework::*;
pub type UpstreamPort = gasket::messaging::TwoPhaseInputPort<ChainSyncEvent>;
pub type DownstreamPort = gasket::messaging::OutputPort<BlockFetchEvent>;
pub type UpstreamPort = gasket::messaging::crossbeam::TwoPhaseInputPort<ChainSyncEvent>;
pub type OuroborosClient = blockfetch::Client<ProtocolChannel>;
pub struct Worker {
pub struct Worker<T>
where
T: Send + Sync,
{
client: OuroborosClient,
upstream: UpstreamPort,
downstream: DownstreamPort,
downstream: DownstreamPort<T>,
block_count: gasket::metrics::Counter,
}
impl Worker {
impl<T> Worker<T>
where
T: Send + Sync,
{
pub fn new(
plexer: ProtocolChannel,
upstream: UpstreamPort,
downstream: DownstreamPort,
downstream: DownstreamPort<T>,
) -> Self {
let client = OuroborosClient::new(plexer);
@ -56,7 +61,10 @@ impl Worker {
}
}
impl gasket::runtime::Worker for Worker {
impl<A> gasket::runtime::Worker for Worker<A>
where
A: SendAdapter<BlockFetchEvent>,
{
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Builder::new()
.with_counter("fetched_blocks", &self.block_count)

View file

@ -17,7 +17,7 @@ fn to_traverse(header: &chainsync::HeaderContent) -> Result<MultiEraHeader<'_>,
out.map_err(Error::parse)
}
pub type DownstreamPort = gasket::messaging::OutputPort<ChainSyncEvent>;
pub type DownstreamPort = gasket::messaging::crossbeam::OutputPort<ChainSyncEvent>;
pub type OuroborosClient = chainsync::N2NClient<ProtocolChannel>;

View file

@ -21,14 +21,16 @@ pub enum BlockFetchEvent {
}
// ports used by plexer
pub type MuxOutputPort = gasket::messaging::OutputPort<(u16, multiplexer::Payload)>;
pub type DemuxInputPort = gasket::messaging::InputPort<multiplexer::Payload>;
pub type MuxOutputPort = gasket::messaging::crossbeam::OutputPort<(u16, multiplexer::Payload)>;
pub type DemuxInputPort = gasket::messaging::crossbeam::InputPort<multiplexer::Payload>;
// ports used by mini-protocols
pub type MuxInputPort = gasket::messaging::InputPort<(u16, multiplexer::Payload)>;
pub type DemuxOutputPort = gasket::messaging::OutputPort<multiplexer::Payload>;
pub type MuxInputPort = gasket::messaging::crossbeam::InputPort<(u16, multiplexer::Payload)>;
pub type DemuxOutputPort = gasket::messaging::crossbeam::OutputPort<multiplexer::Payload>;
// final output port
pub type DownstreamPort<A> = gasket::messaging::OutputPort<A, BlockFetchEvent>;
#[derive(Debug)]
pub struct ProtocolChannel(pub u16, pub MuxOutputPort, pub DemuxInputPort);
impl multiplexer::agents::Channel for ProtocolChannel {
@ -61,14 +63,14 @@ impl multiplexer::agents::Channel for ProtocolChannel {
#[derive(Error, Debug)]
pub enum Error {
#[error("client error: {0}")]
ClientError(String),
#[error("{0}")]
Client(String),
#[error("parse error: {0}")]
ParseError(String),
#[error("{0}")]
Parse(String),
#[error("server error: {0}")]
ServerError(String),
#[error("{0}")]
Server(String),
#[error("{0}")]
Message(String),
@ -79,15 +81,15 @@ pub enum Error {
impl Error {
pub fn client(error: impl ToString) -> Error {
Error::ClientError(error.to_string())
Error::Client(error.to_string())
}
pub fn parse(error: impl ToString) -> Error {
Error::ParseError(error.to_string())
Error::Parse(error.to_string())
}
pub fn server(error: impl ToString) -> Error {
Error::ServerError(error.to_string())
Error::Server(error.to_string())
}
pub fn message(error: impl ToString) -> Error {