feat: Introduce Upstream crate (#230)
This commit is contained in:
parent
fc2728639f
commit
f1017ccb37
13 changed files with 859 additions and 0 deletions
|
|
@ -8,6 +8,7 @@ members = [
|
|||
"pallas-crypto",
|
||||
"pallas-primitives",
|
||||
"pallas-traverse",
|
||||
"pallas-upstream",
|
||||
"pallas",
|
||||
"examples/block-download",
|
||||
"examples/block-decode",
|
||||
|
|
|
|||
23
pallas-upstream/Cargo.toml
Normal file
23
pallas-upstream/Cargo.toml
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
[package]
|
||||
name = "pallas-upstream"
|
||||
description = "Opinionated implementation of component that pulls chain data from an upstream node"
|
||||
version = "0.18.0"
|
||||
edition = "2021"
|
||||
repository = "https://github.com/txpipe/pallas"
|
||||
homepage = "https://github.com/txpipe/pallas"
|
||||
documentation = "https://docs.rs/pallas-upstream"
|
||||
license = "Apache-2.0"
|
||||
readme = "README.md"
|
||||
authors = ["Santiago Carmuega <santiago@carmuega.me>"]
|
||||
|
||||
[dependencies]
|
||||
gasket = { git = "https://github.com/construkts/gasket-rs", version = "0.1.0" }
|
||||
pallas-codec = { version = "0.18.0", path = "../pallas-codec" }
|
||||
pallas-crypto = { version = "0.18.0", path = "../pallas-crypto" }
|
||||
pallas-miniprotocols = { version = "0.18.0", path = "../pallas-miniprotocols" }
|
||||
pallas-multiplexer = { version = "0.18.0", path = "../pallas-multiplexer" }
|
||||
pallas-traverse = { version = "0.18.0", path = "../pallas-traverse" }
|
||||
rayon = "1.7.0"
|
||||
serde = { version = "1.0.154", features = ["derive"] }
|
||||
thiserror = "1.0.31"
|
||||
tracing = "0.1.37"
|
||||
3
pallas-upstream/README.md
Normal file
3
pallas-upstream/README.md
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
# Pallas Upstream
|
||||
|
||||
An opinionated implementation of component that pulls chain data from an upstream node. It's implemented as a pipeline using SEDA (staged event-driven architecture), heavily coupled with the [Gasket](https://github.com/construkts/gasket-rs) framework.
|
||||
116
pallas-upstream/src/api.rs
Normal file
116
pallas-upstream/src/api.rs
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
pub use crate::wellknown as chains;
|
||||
|
||||
pub use crate::cursor;
|
||||
|
||||
pub mod n2n {
|
||||
use crate::{blockfetch, chainsync, cursor::Cursor, framework::*, plexer};
|
||||
use gasket::runtime::Tether;
|
||||
|
||||
pub struct Runtime {
|
||||
pub plexer_tether: Tether,
|
||||
pub chainsync_tether: Tether,
|
||||
pub blockfetch_tether: Tether,
|
||||
}
|
||||
|
||||
pub struct Bootstrapper {
|
||||
cursor: Cursor,
|
||||
peer_address: String,
|
||||
network_magic: u64,
|
||||
output: blockfetch::DownstreamPort,
|
||||
}
|
||||
|
||||
impl Bootstrapper {
|
||||
pub fn new(cursor: Cursor, peer_address: String, network_magic: u64) -> Self {
|
||||
Bootstrapper {
|
||||
cursor,
|
||||
peer_address,
|
||||
network_magic,
|
||||
output: blockfetch::DownstreamPort::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn borrow_output_port(&mut self) -> &mut blockfetch::DownstreamPort {
|
||||
&mut self.output
|
||||
}
|
||||
|
||||
pub fn spawn(self) -> Result<Runtime, Error> {
|
||||
/*
|
||||
TODO: this is how we envision the setup of complex pipelines leveraging Rust macros:
|
||||
|
||||
pipeline!(
|
||||
plexer = plexer::Worker::new(xx),
|
||||
chainsync = chainsync::Worker::new(yy),
|
||||
blockfetch = blockfetch::Worker::new(yy),
|
||||
reducer = reducer::Worker::new(yy),
|
||||
plexer.demux2 => chainsync.demux2,
|
||||
plexer.demux3 => blockfetch.demux3,
|
||||
chainsync.mux2 + blockfetch.mux3 => plexer.mux,
|
||||
chainsync.downstream => blockfetch.upstream,
|
||||
blockfetch.downstream => reducer.upstream,
|
||||
);
|
||||
|
||||
The above snippet would replace the rest of the code in this function, which is just a more verbose, manual way of saying the same thing.
|
||||
*/
|
||||
|
||||
let mut mux_input = MuxInputPort::default();
|
||||
|
||||
let mut demux2_out = DemuxOutputPort::default();
|
||||
let mut demux2_in = DemuxInputPort::default();
|
||||
gasket::messaging::connect_ports(&mut demux2_out, &mut demux2_in, 1000);
|
||||
|
||||
let mut demux3_out = DemuxOutputPort::default();
|
||||
let mut demux3_in = DemuxInputPort::default();
|
||||
gasket::messaging::connect_ports(&mut demux3_out, &mut demux3_in, 1000);
|
||||
|
||||
let mut mux2_out = MuxOutputPort::default();
|
||||
let mut mux3_out = MuxOutputPort::default();
|
||||
gasket::messaging::funnel_ports(
|
||||
vec![&mut mux2_out, &mut mux3_out],
|
||||
&mut mux_input,
|
||||
1000,
|
||||
);
|
||||
|
||||
let mut chainsync_downstream = chainsync::DownstreamPort::default();
|
||||
let mut blockfetch_upstream = blockfetch::UpstreamPort::default();
|
||||
gasket::messaging::connect_ports(
|
||||
&mut chainsync_downstream,
|
||||
&mut blockfetch_upstream,
|
||||
20,
|
||||
);
|
||||
|
||||
let plexer_tether = gasket::runtime::spawn_stage(
|
||||
plexer::Worker::new(
|
||||
self.peer_address,
|
||||
self.network_magic,
|
||||
mux_input,
|
||||
Some(demux2_out),
|
||||
Some(demux3_out),
|
||||
),
|
||||
gasket::runtime::Policy::default(),
|
||||
Some("plexer"),
|
||||
);
|
||||
|
||||
let channel2 = ProtocolChannel(2, mux2_out, demux2_in);
|
||||
|
||||
let chainsync_tether = gasket::runtime::spawn_stage(
|
||||
chainsync::Worker::new(self.cursor, channel2, chainsync_downstream),
|
||||
gasket::runtime::Policy::default(),
|
||||
Some("chainsync"),
|
||||
);
|
||||
|
||||
let channel3 = ProtocolChannel(3, mux3_out, demux3_in);
|
||||
|
||||
let blockfetch_tether = gasket::runtime::spawn_stage(
|
||||
blockfetch::Worker::new(channel3, blockfetch_upstream, self.output),
|
||||
gasket::runtime::Policy::default(),
|
||||
Some("blockfetch"),
|
||||
);
|
||||
|
||||
Ok(Runtime {
|
||||
plexer_tether,
|
||||
chainsync_tether,
|
||||
blockfetch_tether,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
84
pallas-upstream/src/blockfetch.rs
Normal file
84
pallas-upstream/src/blockfetch.rs
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
use tracing::{debug, error, instrument};
|
||||
|
||||
use pallas_crypto::hash::Hash;
|
||||
use pallas_miniprotocols::blockfetch;
|
||||
use pallas_miniprotocols::Point;
|
||||
|
||||
use crate::framework::*;
|
||||
|
||||
pub type UpstreamPort = gasket::messaging::TwoPhaseInputPort<ChainSyncEvent>;
|
||||
pub type DownstreamPort = gasket::messaging::OutputPort<BlockFetchEvent>;
|
||||
|
||||
pub type OuroborosClient = blockfetch::Client<ProtocolChannel>;
|
||||
|
||||
pub struct Worker {
|
||||
client: OuroborosClient,
|
||||
upstream: UpstreamPort,
|
||||
downstream: DownstreamPort,
|
||||
block_count: gasket::metrics::Counter,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
pub fn new(
|
||||
plexer: ProtocolChannel,
|
||||
upstream: UpstreamPort,
|
||||
downstream: DownstreamPort,
|
||||
) -> Self {
|
||||
let client = OuroborosClient::new(plexer);
|
||||
|
||||
Self {
|
||||
client,
|
||||
upstream,
|
||||
downstream,
|
||||
block_count: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
fn fetch_block(&mut self, slot: u64, hash: Hash<32>) -> Result<Vec<u8>, gasket::error::Error> {
|
||||
match self
|
||||
.client
|
||||
.fetch_single(Point::Specific(slot, hash.to_vec()))
|
||||
{
|
||||
Ok(x) => {
|
||||
debug!("block fetch succeded");
|
||||
Ok(x)
|
||||
}
|
||||
Err(blockfetch::Error::ChannelError(x)) => {
|
||||
error!("plexer channel error: {}", x);
|
||||
Err(gasket::error::Error::RetryableError)
|
||||
}
|
||||
Err(x) => {
|
||||
error!("unrecoverable block fetch error: {}", x);
|
||||
Err(gasket::error::Error::WorkPanic)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl gasket::runtime::Worker for Worker {
|
||||
fn metrics(&self) -> gasket::metrics::Registry {
|
||||
gasket::metrics::Builder::new()
|
||||
.with_counter("fetched_blocks", &self.block_count)
|
||||
.build()
|
||||
}
|
||||
|
||||
fn work(&mut self) -> gasket::runtime::WorkResult {
|
||||
let msg = self.upstream.recv_or_idle()?;
|
||||
|
||||
let msg = match msg.payload {
|
||||
ChainSyncEvent::RollForward(s, h) => {
|
||||
let body = self.fetch_block(s, h)?;
|
||||
BlockFetchEvent::RollForward(s, h, body)
|
||||
}
|
||||
ChainSyncEvent::Rollback(x) => BlockFetchEvent::Rollback(x),
|
||||
};
|
||||
|
||||
self.downstream.send(msg.into())?;
|
||||
|
||||
// remove the processed event from the queue
|
||||
self.upstream.commit();
|
||||
|
||||
Ok(gasket::runtime::WorkOutcome::Partial)
|
||||
}
|
||||
}
|
||||
129
pallas-upstream/src/chainsync.rs
Normal file
129
pallas-upstream/src/chainsync.rs
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
use gasket::error::AsWorkError;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use pallas_miniprotocols::chainsync::{HeaderContent, NextResponse};
|
||||
use pallas_miniprotocols::{chainsync, Point};
|
||||
use pallas_traverse::MultiEraHeader;
|
||||
|
||||
use crate::cursor::{Cursor, Intersection};
|
||||
use crate::framework::*;
|
||||
|
||||
fn to_traverse(header: &chainsync::HeaderContent) -> Result<MultiEraHeader<'_>, Error> {
|
||||
let out = match header.byron_prefix {
|
||||
Some((subtag, _)) => MultiEraHeader::decode(header.variant, Some(subtag), &header.cbor),
|
||||
None => MultiEraHeader::decode(header.variant, None, &header.cbor),
|
||||
};
|
||||
|
||||
out.map_err(Error::parse)
|
||||
}
|
||||
|
||||
pub type DownstreamPort = gasket::messaging::OutputPort<ChainSyncEvent>;
|
||||
|
||||
pub type OuroborosClient = chainsync::N2NClient<ProtocolChannel>;
|
||||
|
||||
pub struct Worker {
|
||||
chain_cursor: Cursor,
|
||||
client: OuroborosClient,
|
||||
downstream: DownstreamPort,
|
||||
block_count: gasket::metrics::Counter,
|
||||
chain_tip: gasket::metrics::Gauge,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
pub fn new(chain_cursor: Cursor, plexer: ProtocolChannel, downstream: DownstreamPort) -> Self {
|
||||
let client = OuroborosClient::new(plexer);
|
||||
|
||||
Self {
|
||||
chain_cursor,
|
||||
client,
|
||||
downstream,
|
||||
block_count: Default::default(),
|
||||
chain_tip: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn intersect(&mut self) -> Result<Option<Point>, gasket::error::Error> {
|
||||
let value = self.chain_cursor.read();
|
||||
|
||||
match value {
|
||||
Intersection::Origin => {
|
||||
let point = self.client.intersect_origin().or_restart()?;
|
||||
|
||||
Ok(Some(point))
|
||||
}
|
||||
Intersection::Tip => {
|
||||
let point = self.client.intersect_tip().or_restart()?;
|
||||
|
||||
Ok(Some(point))
|
||||
}
|
||||
Intersection::Breadcrumbs(points) => {
|
||||
let (point, _) = self.client.find_intersect(Vec::from(points)).or_restart()?;
|
||||
|
||||
Ok(point)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_next(
|
||||
&mut self,
|
||||
next: NextResponse<HeaderContent>,
|
||||
) -> Result<(), gasket::error::Error> {
|
||||
match next {
|
||||
chainsync::NextResponse::RollForward(h, t) => {
|
||||
let h = to_traverse(&h).or_panic()?;
|
||||
self.downstream
|
||||
.send(ChainSyncEvent::RollForward(h.slot(), h.hash()).into())?;
|
||||
|
||||
debug!(slot = h.slot(), hash = %h.hash(), "chain sync roll forward");
|
||||
self.chain_tip.set(t.1 as i64);
|
||||
Ok(())
|
||||
}
|
||||
chainsync::NextResponse::RollBackward(p, t) => {
|
||||
self.downstream.send(ChainSyncEvent::Rollback(p).into())?;
|
||||
self.chain_tip.set(t.1 as i64);
|
||||
Ok(())
|
||||
}
|
||||
chainsync::NextResponse::Await => {
|
||||
info!("chain-sync reached the tip of the chain");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn request_next(&mut self) -> Result<(), gasket::error::Error> {
|
||||
info!("requesting next block");
|
||||
let next = self.client.request_next().or_restart()?;
|
||||
self.process_next(next)
|
||||
}
|
||||
|
||||
fn await_next(&mut self) -> Result<(), gasket::error::Error> {
|
||||
info!("awaiting next block (blocking)");
|
||||
let next = self.client.recv_while_must_reply().or_restart()?;
|
||||
self.process_next(next)
|
||||
}
|
||||
}
|
||||
|
||||
impl gasket::runtime::Worker for Worker {
|
||||
fn metrics(&self) -> gasket::metrics::Registry {
|
||||
gasket::metrics::Builder::new()
|
||||
.with_counter("received_blocks", &self.block_count)
|
||||
.with_gauge("chain_tip", &self.chain_tip)
|
||||
.build()
|
||||
}
|
||||
|
||||
fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
|
||||
let intersect = self.intersect()?;
|
||||
info!(?intersect, "chain-sync intersected");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn work(&mut self) -> gasket::runtime::WorkResult {
|
||||
match self.client.has_agency() {
|
||||
true => self.request_next()?,
|
||||
false => self.await_next()?,
|
||||
};
|
||||
|
||||
Ok(gasket::runtime::WorkOutcome::Partial)
|
||||
}
|
||||
}
|
||||
56
pallas-upstream/src/cursor.rs
Normal file
56
pallas-upstream/src/cursor.rs
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
use pallas_miniprotocols::Point;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum Intersection {
|
||||
Tip,
|
||||
Origin,
|
||||
Breadcrumbs(VecDeque<Point>),
|
||||
}
|
||||
|
||||
const HARDCODED_BREADCRUMBS: usize = 20;
|
||||
|
||||
// TODO: include exponential breadcrumbs logic here
|
||||
#[derive(Clone)]
|
||||
pub struct Cursor(Arc<RwLock<Intersection>>);
|
||||
|
||||
impl Cursor {
|
||||
pub fn new(value: Intersection) -> Self {
|
||||
Self(Arc::new(RwLock::new(value)))
|
||||
}
|
||||
|
||||
pub fn read(&self) -> Intersection {
|
||||
let v = self.0.read().unwrap();
|
||||
v.clone()
|
||||
}
|
||||
|
||||
pub fn latest_known_point(&self) -> Option<Point> {
|
||||
let guard = self.0.read().unwrap();
|
||||
|
||||
match &*guard {
|
||||
Intersection::Breadcrumbs(v) => v.front().cloned(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_breadcrumb(&self, value: Point) {
|
||||
let mut guard = self.0.write().unwrap();
|
||||
|
||||
match &mut *guard {
|
||||
Intersection::Tip | Intersection::Origin => {
|
||||
*guard = Intersection::Breadcrumbs(VecDeque::from(vec![value]));
|
||||
}
|
||||
Intersection::Breadcrumbs(crumbs) => {
|
||||
crumbs.push_front(value);
|
||||
|
||||
if crumbs.len() > HARDCODED_BREADCRUMBS {
|
||||
crumbs.pop_back();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
106
pallas-upstream/src/framework.rs
Normal file
106
pallas-upstream/src/framework.rs
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
use pallas_crypto::hash::Hash;
|
||||
use pallas_miniprotocols::Point;
|
||||
use pallas_multiplexer as multiplexer;
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
|
||||
pub type BlockSlot = u64;
|
||||
pub type BlockHash = Hash<32>;
|
||||
pub type RawBlock = Vec<u8>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ChainSyncEvent {
|
||||
RollForward(BlockSlot, BlockHash),
|
||||
Rollback(Point),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum BlockFetchEvent {
|
||||
RollForward(BlockSlot, BlockHash, RawBlock),
|
||||
Rollback(Point),
|
||||
}
|
||||
|
||||
// ports used by plexer
|
||||
pub type MuxOutputPort = gasket::messaging::OutputPort<(u16, multiplexer::Payload)>;
|
||||
pub type DemuxInputPort = gasket::messaging::InputPort<multiplexer::Payload>;
|
||||
|
||||
// ports used by mini-protocols
|
||||
pub type MuxInputPort = gasket::messaging::InputPort<(u16, multiplexer::Payload)>;
|
||||
pub type DemuxOutputPort = gasket::messaging::OutputPort<multiplexer::Payload>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ProtocolChannel(pub u16, pub MuxOutputPort, pub DemuxInputPort);
|
||||
|
||||
impl multiplexer::agents::Channel for ProtocolChannel {
|
||||
fn enqueue_chunk(
|
||||
&mut self,
|
||||
payload: multiplexer::Payload,
|
||||
) -> Result<(), multiplexer::agents::ChannelError> {
|
||||
match self
|
||||
.1
|
||||
.send(gasket::messaging::Message::from((self.0, payload)))
|
||||
{
|
||||
Ok(_) => Ok(()),
|
||||
Err(error) => {
|
||||
error!(?error, "enqueue chunk failed");
|
||||
Err(multiplexer::agents::ChannelError::NotConnected(None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn dequeue_chunk(&mut self) -> Result<multiplexer::Payload, multiplexer::agents::ChannelError> {
|
||||
match self.2.recv_or_idle() {
|
||||
Ok(msg) => Ok(msg.payload),
|
||||
Err(error) => {
|
||||
error!(?error, "dequeue chunk failed");
|
||||
Err(multiplexer::agents::ChannelError::NotConnected(None))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("client error: {0}")]
|
||||
ClientError(String),
|
||||
|
||||
#[error("parse error: {0}")]
|
||||
ParseError(String),
|
||||
|
||||
#[error("server error: {0}")]
|
||||
ServerError(String),
|
||||
|
||||
#[error("{0}")]
|
||||
Message(String),
|
||||
|
||||
#[error("{0}")]
|
||||
Custom(String),
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub fn client(error: impl ToString) -> Error {
|
||||
Error::ClientError(error.to_string())
|
||||
}
|
||||
|
||||
pub fn parse(error: impl ToString) -> Error {
|
||||
Error::ParseError(error.to_string())
|
||||
}
|
||||
|
||||
pub fn server(error: impl ToString) -> Error {
|
||||
Error::ServerError(error.to_string())
|
||||
}
|
||||
|
||||
pub fn message(error: impl ToString) -> Error {
|
||||
Error::Message(error.to_string())
|
||||
}
|
||||
|
||||
pub fn custom(error: Box<dyn std::error::Error>) -> Error {
|
||||
Error::Custom(format!("{}", error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Box<dyn std::error::Error>> for Error {
|
||||
fn from(err: Box<dyn std::error::Error>) -> Self {
|
||||
Error::custom(err)
|
||||
}
|
||||
}
|
||||
11
pallas-upstream/src/lib.rs
Normal file
11
pallas-upstream/src/lib.rs
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
pub(crate) mod blockfetch;
|
||||
pub(crate) mod chainsync;
|
||||
pub(crate) mod framework;
|
||||
pub(crate) mod plexer;
|
||||
|
||||
pub mod cursor;
|
||||
pub mod wellknown;
|
||||
|
||||
mod api;
|
||||
|
||||
pub use api::*;
|
||||
201
pallas-upstream/src/plexer.rs
Normal file
201
pallas-upstream/src/plexer.rs
Normal file
|
|
@ -0,0 +1,201 @@
|
|||
use gasket::error::AsWorkError;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use pallas_miniprotocols::handshake;
|
||||
use pallas_multiplexer as multiplexer;
|
||||
use pallas_multiplexer::bearers::Bearer;
|
||||
use pallas_multiplexer::demux::{Demuxer, Egress};
|
||||
use pallas_multiplexer::mux::{Ingress, Muxer};
|
||||
use pallas_multiplexer::sync::SyncPlexer;
|
||||
|
||||
use crate::framework::*;
|
||||
|
||||
struct GasketEgress(DemuxOutputPort);
|
||||
|
||||
impl Egress for GasketEgress {
|
||||
fn send(
|
||||
&mut self,
|
||||
payload: multiplexer::Payload,
|
||||
) -> Result<(), multiplexer::demux::EgressError> {
|
||||
self.0
|
||||
.send(gasket::messaging::Message::from(payload))
|
||||
.map_err(|_| multiplexer::demux::EgressError(vec![]))
|
||||
}
|
||||
}
|
||||
|
||||
struct GasketIngress(MuxInputPort);
|
||||
|
||||
impl Ingress for GasketIngress {
|
||||
fn recv_timeout(
|
||||
&mut self,
|
||||
duration: std::time::Duration,
|
||||
) -> Result<multiplexer::Message, multiplexer::mux::IngressError> {
|
||||
self.0
|
||||
.recv_timeout(duration)
|
||||
.map(|msg| msg.payload)
|
||||
.map_err(|err| match err {
|
||||
gasket::error::Error::RecvIdle => multiplexer::mux::IngressError::Empty,
|
||||
_ => multiplexer::mux::IngressError::Disconnected,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type IsBusy = bool;
|
||||
|
||||
fn handle_demux_outcome(
|
||||
outcome: Result<multiplexer::demux::TickOutcome, multiplexer::demux::DemuxError>,
|
||||
) -> Result<IsBusy, gasket::error::Error> {
|
||||
match outcome {
|
||||
Ok(x) => match x {
|
||||
multiplexer::demux::TickOutcome::Busy => Ok(true),
|
||||
multiplexer::demux::TickOutcome::Idle => Ok(false),
|
||||
},
|
||||
Err(err) => match err {
|
||||
multiplexer::demux::DemuxError::BearerError(err) => {
|
||||
error!("{}", err.kind());
|
||||
Err(gasket::error::Error::ShouldRestart)
|
||||
}
|
||||
multiplexer::demux::DemuxError::EgressDisconnected(x, _) => {
|
||||
error!(protocol = x, "egress disconnected");
|
||||
Err(gasket::error::Error::WorkPanic)
|
||||
}
|
||||
multiplexer::demux::DemuxError::EgressUnknown(x, _) => {
|
||||
error!(protocol = x, "unknown egress");
|
||||
Err(gasket::error::Error::WorkPanic)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_mux_outcome(
|
||||
outcome: multiplexer::mux::TickOutcome,
|
||||
) -> Result<IsBusy, gasket::error::Error> {
|
||||
match outcome {
|
||||
multiplexer::mux::TickOutcome::Busy => Ok(true),
|
||||
multiplexer::mux::TickOutcome::Idle => Ok(false),
|
||||
multiplexer::mux::TickOutcome::BearerError(err) => {
|
||||
warn!(%err);
|
||||
Err(gasket::error::Error::ShouldRestart)
|
||||
}
|
||||
multiplexer::mux::TickOutcome::IngressDisconnected => {
|
||||
error!("ingress disconnected");
|
||||
Err(gasket::error::Error::WorkPanic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Worker {
|
||||
peer_address: String,
|
||||
network_magic: u64,
|
||||
input: MuxInputPort,
|
||||
channel2_out: Option<DemuxOutputPort>,
|
||||
channel3_out: Option<DemuxOutputPort>,
|
||||
demuxer: Option<Demuxer<GasketEgress>>,
|
||||
muxer: Option<Muxer<GasketIngress>>,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
pub fn new(
|
||||
peer_address: String,
|
||||
network_magic: u64,
|
||||
input: MuxInputPort,
|
||||
channel2_out: Option<DemuxOutputPort>,
|
||||
channel3_out: Option<DemuxOutputPort>,
|
||||
) -> Self {
|
||||
Self {
|
||||
peer_address,
|
||||
network_magic,
|
||||
input,
|
||||
channel2_out,
|
||||
channel3_out,
|
||||
demuxer: None,
|
||||
muxer: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn handshake(&self, bearer: Bearer) -> Result<Bearer, gasket::error::Error> {
|
||||
info!("excuting handshake");
|
||||
|
||||
let plexer = SyncPlexer::new(bearer, 0);
|
||||
let versions = handshake::n2n::VersionTable::v7_and_above(self.network_magic);
|
||||
let mut client = handshake::Client::new(plexer);
|
||||
|
||||
let output = client.handshake(versions).or_panic()?;
|
||||
debug!("handshake output: {:?}", output);
|
||||
|
||||
let bearer = client.unwrap().unwrap();
|
||||
|
||||
match output {
|
||||
handshake::Confirmation::Accepted(version, _) => {
|
||||
info!(version, "connected to upstream peer");
|
||||
Ok(bearer)
|
||||
}
|
||||
_ => {
|
||||
error!("couldn't agree on handshake version");
|
||||
Err(gasket::error::Error::WorkPanic)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl gasket::runtime::Worker for Worker {
|
||||
fn metrics(&self) -> gasket::metrics::Registry {
|
||||
// TODO: define networking metrics (bytes in / out, etc)
|
||||
gasket::metrics::Builder::new().build()
|
||||
}
|
||||
|
||||
fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
|
||||
debug!("connecting muxer");
|
||||
|
||||
let bearer = multiplexer::bearers::Bearer::connect_tcp(&self.peer_address).or_restart()?;
|
||||
|
||||
let bearer = self.handshake(bearer)?;
|
||||
|
||||
let mut demuxer = Demuxer::new(bearer.clone());
|
||||
|
||||
if let Some(c2) = &self.channel2_out {
|
||||
demuxer.register(2, GasketEgress(c2.clone()));
|
||||
}
|
||||
|
||||
if let Some(c3) = &self.channel3_out {
|
||||
demuxer.register(3, GasketEgress(c3.clone()));
|
||||
}
|
||||
|
||||
self.demuxer = Some(demuxer);
|
||||
|
||||
let muxer = Muxer::new(bearer, GasketIngress(self.input.clone()));
|
||||
self.muxer = Some(muxer);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn work(&mut self) -> gasket::runtime::WorkResult {
|
||||
let muxer = self.muxer.as_mut().unwrap();
|
||||
let demuxer = self.demuxer.as_mut().unwrap();
|
||||
|
||||
let span = tracing::span::Span::current();
|
||||
|
||||
let mut mux_res = None;
|
||||
let mut demux_res = None;
|
||||
|
||||
rayon::scope(|s| {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
info!("mux ticking");
|
||||
let outcome = muxer.tick();
|
||||
mux_res = Some(handle_mux_outcome(outcome));
|
||||
});
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
info!("demux ticking");
|
||||
let outcome = demuxer.tick();
|
||||
demux_res = Some(handle_demux_outcome(outcome));
|
||||
});
|
||||
});
|
||||
|
||||
mux_res.unwrap()?;
|
||||
demux_res.unwrap()?;
|
||||
|
||||
Ok(gasket::runtime::WorkOutcome::Partial)
|
||||
}
|
||||
}
|
||||
125
pallas-upstream/src/wellknown.rs
Normal file
125
pallas-upstream/src/wellknown.rs
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use pallas_miniprotocols::{
|
||||
MAINNET_MAGIC,
|
||||
TESTNET_MAGIC,
|
||||
// PREVIEW_MAGIC, PRE_PRODUCTION_MAGIC,
|
||||
};
|
||||
|
||||
use crate::framework::*;
|
||||
|
||||
// TODO: use from pallas once available
|
||||
pub const PRE_PRODUCTION_MAGIC: u64 = 1;
|
||||
pub const PREVIEW_MAGIC: u64 = 2;
|
||||
|
||||
/// Well-known information about specific networks
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct WellKnownChainInfo {
|
||||
pub magic: u64,
|
||||
pub byron_epoch_length: u32,
|
||||
pub byron_slot_length: u32,
|
||||
pub byron_known_slot: u64,
|
||||
pub byron_known_hash: String,
|
||||
pub byron_known_time: u64,
|
||||
pub shelley_epoch_length: u32,
|
||||
pub shelley_slot_length: u32,
|
||||
pub shelley_known_slot: u64,
|
||||
pub shelley_known_hash: String,
|
||||
pub shelley_known_time: u64,
|
||||
}
|
||||
|
||||
impl WellKnownChainInfo {
|
||||
/// Hardcoded values for mainnet
|
||||
pub fn mainnet() -> Self {
|
||||
WellKnownChainInfo {
|
||||
magic: MAINNET_MAGIC,
|
||||
byron_epoch_length: 432000,
|
||||
byron_slot_length: 20,
|
||||
byron_known_slot: 0,
|
||||
byron_known_time: 1506203091,
|
||||
byron_known_hash: "f0f7892b5c333cffc4b3c4344de48af4cc63f55e44936196f365a9ef2244134f"
|
||||
.to_string(),
|
||||
shelley_epoch_length: 432000,
|
||||
shelley_slot_length: 1,
|
||||
shelley_known_slot: 4492800,
|
||||
shelley_known_hash: "aa83acbf5904c0edfe4d79b3689d3d00fcfc553cf360fd2229b98d464c28e9de"
|
||||
.to_string(),
|
||||
shelley_known_time: 1596059091,
|
||||
}
|
||||
}
|
||||
|
||||
/// Hardcoded values for testnet
|
||||
pub fn testnet() -> Self {
|
||||
WellKnownChainInfo {
|
||||
magic: TESTNET_MAGIC,
|
||||
byron_epoch_length: 432000,
|
||||
byron_slot_length: 20,
|
||||
byron_known_slot: 0,
|
||||
byron_known_time: 1564010416,
|
||||
byron_known_hash: "8f8602837f7c6f8b8867dd1cbc1842cf51a27eaed2c70ef48325d00f8efb320f"
|
||||
.to_string(),
|
||||
shelley_epoch_length: 432000,
|
||||
shelley_slot_length: 1,
|
||||
shelley_known_slot: 1598400,
|
||||
shelley_known_hash: "02b1c561715da9e540411123a6135ee319b02f60b9a11a603d3305556c04329f"
|
||||
.to_string(),
|
||||
shelley_known_time: 1595967616,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn preview() -> Self {
|
||||
WellKnownChainInfo {
|
||||
magic: PREVIEW_MAGIC,
|
||||
byron_epoch_length: 432000,
|
||||
byron_slot_length: 20,
|
||||
byron_known_slot: 0,
|
||||
byron_known_hash: "".to_string(),
|
||||
byron_known_time: 1660003200,
|
||||
shelley_epoch_length: 432000,
|
||||
shelley_slot_length: 1,
|
||||
shelley_known_slot: 25260,
|
||||
shelley_known_hash: "cac921895ef5f2e85f7e6e6b51b663ab81b3605cd47d6b6d66e8e785e5c65011"
|
||||
.to_string(),
|
||||
shelley_known_time: 1660003200,
|
||||
}
|
||||
}
|
||||
|
||||
/// Hardcoded values for the "pre-prod" testnet
|
||||
pub fn preprod() -> Self {
|
||||
WellKnownChainInfo {
|
||||
magic: PRE_PRODUCTION_MAGIC,
|
||||
byron_epoch_length: 432000,
|
||||
byron_slot_length: 20,
|
||||
byron_known_slot: 0,
|
||||
byron_known_hash: "9ad7ff320c9cf74e0f5ee78d22a85ce42bb0a487d0506bf60cfb5a91ea4497d2"
|
||||
.to_string(),
|
||||
byron_known_time: 1654041600,
|
||||
shelley_epoch_length: 432000,
|
||||
shelley_slot_length: 1,
|
||||
shelley_known_slot: 86400,
|
||||
shelley_known_hash: "c4a1595c5cc7a31eda9e544986fe9387af4e3491afe0ca9a80714f01951bbd5c"
|
||||
.to_string(),
|
||||
shelley_known_time: 1654041600,
|
||||
}
|
||||
}
|
||||
|
||||
/// Uses the value of the magic to return either mainnet or testnet
|
||||
/// hardcoded values.
|
||||
pub fn try_from_magic(magic: u64) -> Result<WellKnownChainInfo, Error> {
|
||||
match magic {
|
||||
MAINNET_MAGIC => Ok(Self::mainnet()),
|
||||
TESTNET_MAGIC => Ok(Self::testnet()),
|
||||
PREVIEW_MAGIC => Ok(Self::preview()),
|
||||
PRE_PRODUCTION_MAGIC => Ok(Self::preprod()),
|
||||
_ => Err(Error::message(
|
||||
"can't infer well-known chain from specified magic",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for WellKnownChainInfo {
|
||||
fn default() -> Self {
|
||||
Self::mainnet()
|
||||
}
|
||||
}
|
||||
|
|
@ -18,3 +18,4 @@ pallas-traverse = { version = "0.18.0", path = "../pallas-traverse/" }
|
|||
pallas-addresses = { version = "0.18.0", path = "../pallas-addresses/" }
|
||||
pallas-crypto = { version = "0.18.0", path = "../pallas-crypto/" }
|
||||
pallas-codec = { version = "0.18.0", path = "../pallas-codec/" }
|
||||
pallas-upstream = { version = "0.18.0", path = "../pallas-upstream/" }
|
||||
|
|
|
|||
|
|
@ -5,3 +5,6 @@ pub use pallas_multiplexer as multiplexer;
|
|||
|
||||
#[doc(inline)]
|
||||
pub use pallas_miniprotocols as miniprotocols;
|
||||
|
||||
#[doc(inline)]
|
||||
pub use pallas_upstream as upstream;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue