From f1017ccb3790eda80aff533d329be9691ffb4c54 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 10 Mar 2023 19:57:51 +0100 Subject: [PATCH] feat: Introduce Upstream crate (#230) --- Cargo.toml | 1 + pallas-upstream/Cargo.toml | 23 ++++ pallas-upstream/README.md | 3 + pallas-upstream/src/api.rs | 116 +++++++++++++++++ pallas-upstream/src/blockfetch.rs | 84 +++++++++++++ pallas-upstream/src/chainsync.rs | 129 +++++++++++++++++++ pallas-upstream/src/cursor.rs | 56 +++++++++ pallas-upstream/src/framework.rs | 106 ++++++++++++++++ pallas-upstream/src/lib.rs | 11 ++ pallas-upstream/src/plexer.rs | 201 ++++++++++++++++++++++++++++++ pallas-upstream/src/wellknown.rs | 125 +++++++++++++++++++ pallas/Cargo.toml | 1 + pallas/src/network.rs | 3 + 13 files changed, 859 insertions(+) create mode 100644 pallas-upstream/Cargo.toml create mode 100644 pallas-upstream/README.md create mode 100644 pallas-upstream/src/api.rs create mode 100644 pallas-upstream/src/blockfetch.rs create mode 100644 pallas-upstream/src/chainsync.rs create mode 100644 pallas-upstream/src/cursor.rs create mode 100644 pallas-upstream/src/framework.rs create mode 100644 pallas-upstream/src/lib.rs create mode 100644 pallas-upstream/src/plexer.rs create mode 100644 pallas-upstream/src/wellknown.rs diff --git a/Cargo.toml b/Cargo.toml index b77138f..2112f4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "pallas-crypto", "pallas-primitives", "pallas-traverse", + "pallas-upstream", "pallas", "examples/block-download", "examples/block-decode", diff --git a/pallas-upstream/Cargo.toml b/pallas-upstream/Cargo.toml new file mode 100644 index 0000000..68d2012 --- /dev/null +++ b/pallas-upstream/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "pallas-upstream" +description = "Opinionated implementation of component that pulls chain data from an upstream node" +version = "0.18.0" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +homepage = "https://github.com/txpipe/pallas" +documentation = "https://docs.rs/pallas-upstream" +license = "Apache-2.0" +readme = "README.md" +authors = ["Santiago Carmuega "] + +[dependencies] +gasket = { git = "https://github.com/construkts/gasket-rs", version = "0.1.0" } +pallas-codec = { version = "0.18.0", path = "../pallas-codec" } +pallas-crypto = { version = "0.18.0", path = "../pallas-crypto" } +pallas-miniprotocols = { version = "0.18.0", path = "../pallas-miniprotocols" } +pallas-multiplexer = { version = "0.18.0", path = "../pallas-multiplexer" } +pallas-traverse = { version = "0.18.0", path = "../pallas-traverse" } +rayon = "1.7.0" +serde = { version = "1.0.154", features = ["derive"] } +thiserror = "1.0.31" +tracing = "0.1.37" diff --git a/pallas-upstream/README.md b/pallas-upstream/README.md new file mode 100644 index 0000000..4e77a32 --- /dev/null +++ b/pallas-upstream/README.md @@ -0,0 +1,3 @@ +# Pallas Upstream + +An opinionated implementation of component that pulls chain data from an upstream node. It's implemented as a pipeline using SEDA (staged event-driven architecture), heavily coupled with the [Gasket](https://github.com/construkts/gasket-rs) framework. \ No newline at end of file diff --git a/pallas-upstream/src/api.rs b/pallas-upstream/src/api.rs new file mode 100644 index 0000000..bc192a1 --- /dev/null +++ b/pallas-upstream/src/api.rs @@ -0,0 +1,116 @@ +pub use crate::wellknown as chains; + +pub use crate::cursor; + +pub mod n2n { + use crate::{blockfetch, chainsync, cursor::Cursor, framework::*, plexer}; + use gasket::runtime::Tether; + + pub struct Runtime { + pub plexer_tether: Tether, + pub chainsync_tether: Tether, + pub blockfetch_tether: Tether, + } + + pub struct Bootstrapper { + cursor: Cursor, + peer_address: String, + network_magic: u64, + output: blockfetch::DownstreamPort, + } + + impl Bootstrapper { + pub fn new(cursor: Cursor, peer_address: String, network_magic: u64) -> Self { + Bootstrapper { + cursor, + peer_address, + network_magic, + output: blockfetch::DownstreamPort::default(), + } + } + + pub fn borrow_output_port(&mut self) -> &mut blockfetch::DownstreamPort { + &mut self.output + } + + pub fn spawn(self) -> Result { + /* + TODO: this is how we envision the setup of complex pipelines leveraging Rust macros: + + pipeline!( + plexer = plexer::Worker::new(xx), + chainsync = chainsync::Worker::new(yy), + blockfetch = blockfetch::Worker::new(yy), + reducer = reducer::Worker::new(yy), + plexer.demux2 => chainsync.demux2, + plexer.demux3 => blockfetch.demux3, + chainsync.mux2 + blockfetch.mux3 => plexer.mux, + chainsync.downstream => blockfetch.upstream, + blockfetch.downstream => reducer.upstream, + ); + + The above snippet would replace the rest of the code in this function, which is just a more verbose, manual way of saying the same thing. + */ + + let mut mux_input = MuxInputPort::default(); + + let mut demux2_out = DemuxOutputPort::default(); + let mut demux2_in = DemuxInputPort::default(); + gasket::messaging::connect_ports(&mut demux2_out, &mut demux2_in, 1000); + + let mut demux3_out = DemuxOutputPort::default(); + let mut demux3_in = DemuxInputPort::default(); + gasket::messaging::connect_ports(&mut demux3_out, &mut demux3_in, 1000); + + let mut mux2_out = MuxOutputPort::default(); + let mut mux3_out = MuxOutputPort::default(); + gasket::messaging::funnel_ports( + vec![&mut mux2_out, &mut mux3_out], + &mut mux_input, + 1000, + ); + + let mut chainsync_downstream = chainsync::DownstreamPort::default(); + let mut blockfetch_upstream = blockfetch::UpstreamPort::default(); + gasket::messaging::connect_ports( + &mut chainsync_downstream, + &mut blockfetch_upstream, + 20, + ); + + let plexer_tether = gasket::runtime::spawn_stage( + plexer::Worker::new( + self.peer_address, + self.network_magic, + mux_input, + Some(demux2_out), + Some(demux3_out), + ), + gasket::runtime::Policy::default(), + Some("plexer"), + ); + + let channel2 = ProtocolChannel(2, mux2_out, demux2_in); + + let chainsync_tether = gasket::runtime::spawn_stage( + chainsync::Worker::new(self.cursor, channel2, chainsync_downstream), + gasket::runtime::Policy::default(), + Some("chainsync"), + ); + + let channel3 = ProtocolChannel(3, mux3_out, demux3_in); + + let blockfetch_tether = gasket::runtime::spawn_stage( + blockfetch::Worker::new(channel3, blockfetch_upstream, self.output), + gasket::runtime::Policy::default(), + Some("blockfetch"), + ); + + Ok(Runtime { + plexer_tether, + chainsync_tether, + blockfetch_tether, + }) + } + } +} diff --git a/pallas-upstream/src/blockfetch.rs b/pallas-upstream/src/blockfetch.rs new file mode 100644 index 0000000..0e0d02e --- /dev/null +++ b/pallas-upstream/src/blockfetch.rs @@ -0,0 +1,84 @@ +use tracing::{debug, error, instrument}; + +use pallas_crypto::hash::Hash; +use pallas_miniprotocols::blockfetch; +use pallas_miniprotocols::Point; + +use crate::framework::*; + +pub type UpstreamPort = gasket::messaging::TwoPhaseInputPort; +pub type DownstreamPort = gasket::messaging::OutputPort; + +pub type OuroborosClient = blockfetch::Client; + +pub struct Worker { + client: OuroborosClient, + upstream: UpstreamPort, + downstream: DownstreamPort, + block_count: gasket::metrics::Counter, +} + +impl Worker { + pub fn new( + plexer: ProtocolChannel, + upstream: UpstreamPort, + downstream: DownstreamPort, + ) -> Self { + let client = OuroborosClient::new(plexer); + + Self { + client, + upstream, + downstream, + block_count: Default::default(), + } + } + + #[instrument(skip(self))] + fn fetch_block(&mut self, slot: u64, hash: Hash<32>) -> Result, gasket::error::Error> { + match self + .client + .fetch_single(Point::Specific(slot, hash.to_vec())) + { + Ok(x) => { + debug!("block fetch succeded"); + Ok(x) + } + Err(blockfetch::Error::ChannelError(x)) => { + error!("plexer channel error: {}", x); + Err(gasket::error::Error::RetryableError) + } + Err(x) => { + error!("unrecoverable block fetch error: {}", x); + Err(gasket::error::Error::WorkPanic) + } + } + } +} + +impl gasket::runtime::Worker for Worker { + fn metrics(&self) -> gasket::metrics::Registry { + gasket::metrics::Builder::new() + .with_counter("fetched_blocks", &self.block_count) + .build() + } + + fn work(&mut self) -> gasket::runtime::WorkResult { + let msg = self.upstream.recv_or_idle()?; + + let msg = match msg.payload { + ChainSyncEvent::RollForward(s, h) => { + let body = self.fetch_block(s, h)?; + BlockFetchEvent::RollForward(s, h, body) + } + ChainSyncEvent::Rollback(x) => BlockFetchEvent::Rollback(x), + }; + + self.downstream.send(msg.into())?; + + // remove the processed event from the queue + self.upstream.commit(); + + Ok(gasket::runtime::WorkOutcome::Partial) + } +} diff --git a/pallas-upstream/src/chainsync.rs b/pallas-upstream/src/chainsync.rs new file mode 100644 index 0000000..6ff9bfc --- /dev/null +++ b/pallas-upstream/src/chainsync.rs @@ -0,0 +1,129 @@ +use gasket::error::AsWorkError; +use tracing::{debug, info}; + +use pallas_miniprotocols::chainsync::{HeaderContent, NextResponse}; +use pallas_miniprotocols::{chainsync, Point}; +use pallas_traverse::MultiEraHeader; + +use crate::cursor::{Cursor, Intersection}; +use crate::framework::*; + +fn to_traverse(header: &chainsync::HeaderContent) -> Result, Error> { + let out = match header.byron_prefix { + Some((subtag, _)) => MultiEraHeader::decode(header.variant, Some(subtag), &header.cbor), + None => MultiEraHeader::decode(header.variant, None, &header.cbor), + }; + + out.map_err(Error::parse) +} + +pub type DownstreamPort = gasket::messaging::OutputPort; + +pub type OuroborosClient = chainsync::N2NClient; + +pub struct Worker { + chain_cursor: Cursor, + client: OuroborosClient, + downstream: DownstreamPort, + block_count: gasket::metrics::Counter, + chain_tip: gasket::metrics::Gauge, +} + +impl Worker { + pub fn new(chain_cursor: Cursor, plexer: ProtocolChannel, downstream: DownstreamPort) -> Self { + let client = OuroborosClient::new(plexer); + + Self { + chain_cursor, + client, + downstream, + block_count: Default::default(), + chain_tip: Default::default(), + } + } + + fn intersect(&mut self) -> Result, gasket::error::Error> { + let value = self.chain_cursor.read(); + + match value { + Intersection::Origin => { + let point = self.client.intersect_origin().or_restart()?; + + Ok(Some(point)) + } + Intersection::Tip => { + let point = self.client.intersect_tip().or_restart()?; + + Ok(Some(point)) + } + Intersection::Breadcrumbs(points) => { + let (point, _) = self.client.find_intersect(Vec::from(points)).or_restart()?; + + Ok(point) + } + } + } + + fn process_next( + &mut self, + next: NextResponse, + ) -> Result<(), gasket::error::Error> { + match next { + chainsync::NextResponse::RollForward(h, t) => { + let h = to_traverse(&h).or_panic()?; + self.downstream + .send(ChainSyncEvent::RollForward(h.slot(), h.hash()).into())?; + + debug!(slot = h.slot(), hash = %h.hash(), "chain sync roll forward"); + self.chain_tip.set(t.1 as i64); + Ok(()) + } + chainsync::NextResponse::RollBackward(p, t) => { + self.downstream.send(ChainSyncEvent::Rollback(p).into())?; + self.chain_tip.set(t.1 as i64); + Ok(()) + } + chainsync::NextResponse::Await => { + info!("chain-sync reached the tip of the chain"); + Ok(()) + } + } + } + + fn request_next(&mut self) -> Result<(), gasket::error::Error> { + info!("requesting next block"); + let next = self.client.request_next().or_restart()?; + self.process_next(next) + } + + fn await_next(&mut self) -> Result<(), gasket::error::Error> { + info!("awaiting next block (blocking)"); + let next = self.client.recv_while_must_reply().or_restart()?; + self.process_next(next) + } +} + +impl gasket::runtime::Worker for Worker { + fn metrics(&self) -> gasket::metrics::Registry { + gasket::metrics::Builder::new() + .with_counter("received_blocks", &self.block_count) + .with_gauge("chain_tip", &self.chain_tip) + .build() + } + + fn bootstrap(&mut self) -> Result<(), gasket::error::Error> { + let intersect = self.intersect()?; + info!(?intersect, "chain-sync intersected"); + + Ok(()) + } + + fn work(&mut self) -> gasket::runtime::WorkResult { + match self.client.has_agency() { + true => self.request_next()?, + false => self.await_next()?, + }; + + Ok(gasket::runtime::WorkOutcome::Partial) + } +} diff --git a/pallas-upstream/src/cursor.rs b/pallas-upstream/src/cursor.rs new file mode 100644 index 0000000..d77c21e --- /dev/null +++ b/pallas-upstream/src/cursor.rs @@ -0,0 +1,56 @@ +use std::{ + collections::VecDeque, + sync::{Arc, RwLock}, +}; + +use pallas_miniprotocols::Point; + +#[derive(Clone)] +pub enum Intersection { + Tip, + Origin, + Breadcrumbs(VecDeque), +} + +const HARDCODED_BREADCRUMBS: usize = 20; + +// TODO: include exponential breadcrumbs logic here +#[derive(Clone)] +pub struct Cursor(Arc>); + +impl Cursor { + pub fn new(value: Intersection) -> Self { + Self(Arc::new(RwLock::new(value))) + } + + pub fn read(&self) -> Intersection { + let v = self.0.read().unwrap(); + v.clone() + } + + pub fn latest_known_point(&self) -> Option { + let guard = self.0.read().unwrap(); + + match &*guard { + Intersection::Breadcrumbs(v) => v.front().cloned(), + _ => None, + } + } + + pub fn add_breadcrumb(&self, value: Point) { + let mut guard = self.0.write().unwrap(); + + match &mut *guard { + Intersection::Tip | Intersection::Origin => { + *guard = Intersection::Breadcrumbs(VecDeque::from(vec![value])); + } + Intersection::Breadcrumbs(crumbs) => { + crumbs.push_front(value); + + if crumbs.len() > HARDCODED_BREADCRUMBS { + crumbs.pop_back(); + } + } + } + } +} diff --git a/pallas-upstream/src/framework.rs b/pallas-upstream/src/framework.rs new file mode 100644 index 0000000..44f4fbf --- /dev/null +++ b/pallas-upstream/src/framework.rs @@ -0,0 +1,106 @@ +use pallas_crypto::hash::Hash; +use pallas_miniprotocols::Point; +use pallas_multiplexer as multiplexer; +use thiserror::Error; +use tracing::error; + +pub type BlockSlot = u64; +pub type BlockHash = Hash<32>; +pub type RawBlock = Vec; + +#[derive(Debug, Clone)] +pub enum ChainSyncEvent { + RollForward(BlockSlot, BlockHash), + Rollback(Point), +} + +#[derive(Debug, Clone)] +pub enum BlockFetchEvent { + RollForward(BlockSlot, BlockHash, RawBlock), + Rollback(Point), +} + +// ports used by plexer +pub type MuxOutputPort = gasket::messaging::OutputPort<(u16, multiplexer::Payload)>; +pub type DemuxInputPort = gasket::messaging::InputPort; + +// ports used by mini-protocols +pub type MuxInputPort = gasket::messaging::InputPort<(u16, multiplexer::Payload)>; +pub type DemuxOutputPort = gasket::messaging::OutputPort; + +#[derive(Debug)] +pub struct ProtocolChannel(pub u16, pub MuxOutputPort, pub DemuxInputPort); + +impl multiplexer::agents::Channel for ProtocolChannel { + fn enqueue_chunk( + &mut self, + payload: multiplexer::Payload, + ) -> Result<(), multiplexer::agents::ChannelError> { + match self + .1 + .send(gasket::messaging::Message::from((self.0, payload))) + { + Ok(_) => Ok(()), + Err(error) => { + error!(?error, "enqueue chunk failed"); + Err(multiplexer::agents::ChannelError::NotConnected(None)) + } + } + } + + fn dequeue_chunk(&mut self) -> Result { + match self.2.recv_or_idle() { + Ok(msg) => Ok(msg.payload), + Err(error) => { + error!(?error, "dequeue chunk failed"); + Err(multiplexer::agents::ChannelError::NotConnected(None)) + } + } + } +} + +#[derive(Error, Debug)] +pub enum Error { + #[error("client error: {0}")] + ClientError(String), + + #[error("parse error: {0}")] + ParseError(String), + + #[error("server error: {0}")] + ServerError(String), + + #[error("{0}")] + Message(String), + + #[error("{0}")] + Custom(String), +} + +impl Error { + pub fn client(error: impl ToString) -> Error { + Error::ClientError(error.to_string()) + } + + pub fn parse(error: impl ToString) -> Error { + Error::ParseError(error.to_string()) + } + + pub fn server(error: impl ToString) -> Error { + Error::ServerError(error.to_string()) + } + + pub fn message(error: impl ToString) -> Error { + Error::Message(error.to_string()) + } + + pub fn custom(error: Box) -> Error { + Error::Custom(format!("{}", error)) + } +} + +impl From> for Error { + fn from(err: Box) -> Self { + Error::custom(err) + } +} diff --git a/pallas-upstream/src/lib.rs b/pallas-upstream/src/lib.rs new file mode 100644 index 0000000..9f0cd36 --- /dev/null +++ b/pallas-upstream/src/lib.rs @@ -0,0 +1,11 @@ +pub(crate) mod blockfetch; +pub(crate) mod chainsync; +pub(crate) mod framework; +pub(crate) mod plexer; + +pub mod cursor; +pub mod wellknown; + +mod api; + +pub use api::*; diff --git a/pallas-upstream/src/plexer.rs b/pallas-upstream/src/plexer.rs new file mode 100644 index 0000000..2570544 --- /dev/null +++ b/pallas-upstream/src/plexer.rs @@ -0,0 +1,201 @@ +use gasket::error::AsWorkError; +use tracing::{debug, error, info, warn}; + +use pallas_miniprotocols::handshake; +use pallas_multiplexer as multiplexer; +use pallas_multiplexer::bearers::Bearer; +use pallas_multiplexer::demux::{Demuxer, Egress}; +use pallas_multiplexer::mux::{Ingress, Muxer}; +use pallas_multiplexer::sync::SyncPlexer; + +use crate::framework::*; + +struct GasketEgress(DemuxOutputPort); + +impl Egress for GasketEgress { + fn send( + &mut self, + payload: multiplexer::Payload, + ) -> Result<(), multiplexer::demux::EgressError> { + self.0 + .send(gasket::messaging::Message::from(payload)) + .map_err(|_| multiplexer::demux::EgressError(vec![])) + } +} + +struct GasketIngress(MuxInputPort); + +impl Ingress for GasketIngress { + fn recv_timeout( + &mut self, + duration: std::time::Duration, + ) -> Result { + self.0 + .recv_timeout(duration) + .map(|msg| msg.payload) + .map_err(|err| match err { + gasket::error::Error::RecvIdle => multiplexer::mux::IngressError::Empty, + _ => multiplexer::mux::IngressError::Disconnected, + }) + } +} + +type IsBusy = bool; + +fn handle_demux_outcome( + outcome: Result, +) -> Result { + match outcome { + Ok(x) => match x { + multiplexer::demux::TickOutcome::Busy => Ok(true), + multiplexer::demux::TickOutcome::Idle => Ok(false), + }, + Err(err) => match err { + multiplexer::demux::DemuxError::BearerError(err) => { + error!("{}", err.kind()); + Err(gasket::error::Error::ShouldRestart) + } + multiplexer::demux::DemuxError::EgressDisconnected(x, _) => { + error!(protocol = x, "egress disconnected"); + Err(gasket::error::Error::WorkPanic) + } + multiplexer::demux::DemuxError::EgressUnknown(x, _) => { + error!(protocol = x, "unknown egress"); + Err(gasket::error::Error::WorkPanic) + } + }, + } +} + +fn handle_mux_outcome( + outcome: multiplexer::mux::TickOutcome, +) -> Result { + match outcome { + multiplexer::mux::TickOutcome::Busy => Ok(true), + multiplexer::mux::TickOutcome::Idle => Ok(false), + multiplexer::mux::TickOutcome::BearerError(err) => { + warn!(%err); + Err(gasket::error::Error::ShouldRestart) + } + multiplexer::mux::TickOutcome::IngressDisconnected => { + error!("ingress disconnected"); + Err(gasket::error::Error::WorkPanic) + } + } +} + +pub struct Worker { + peer_address: String, + network_magic: u64, + input: MuxInputPort, + channel2_out: Option, + channel3_out: Option, + demuxer: Option>, + muxer: Option>, +} + +impl Worker { + pub fn new( + peer_address: String, + network_magic: u64, + input: MuxInputPort, + channel2_out: Option, + channel3_out: Option, + ) -> Self { + Self { + peer_address, + network_magic, + input, + channel2_out, + channel3_out, + demuxer: None, + muxer: None, + } + } + + fn handshake(&self, bearer: Bearer) -> Result { + info!("excuting handshake"); + + let plexer = SyncPlexer::new(bearer, 0); + let versions = handshake::n2n::VersionTable::v7_and_above(self.network_magic); + let mut client = handshake::Client::new(plexer); + + let output = client.handshake(versions).or_panic()?; + debug!("handshake output: {:?}", output); + + let bearer = client.unwrap().unwrap(); + + match output { + handshake::Confirmation::Accepted(version, _) => { + info!(version, "connected to upstream peer"); + Ok(bearer) + } + _ => { + error!("couldn't agree on handshake version"); + Err(gasket::error::Error::WorkPanic) + } + } + } +} + +impl gasket::runtime::Worker for Worker { + fn metrics(&self) -> gasket::metrics::Registry { + // TODO: define networking metrics (bytes in / out, etc) + gasket::metrics::Builder::new().build() + } + + fn bootstrap(&mut self) -> Result<(), gasket::error::Error> { + debug!("connecting muxer"); + + let bearer = multiplexer::bearers::Bearer::connect_tcp(&self.peer_address).or_restart()?; + + let bearer = self.handshake(bearer)?; + + let mut demuxer = Demuxer::new(bearer.clone()); + + if let Some(c2) = &self.channel2_out { + demuxer.register(2, GasketEgress(c2.clone())); + } + + if let Some(c3) = &self.channel3_out { + demuxer.register(3, GasketEgress(c3.clone())); + } + + self.demuxer = Some(demuxer); + + let muxer = Muxer::new(bearer, GasketIngress(self.input.clone())); + self.muxer = Some(muxer); + + Ok(()) + } + + fn work(&mut self) -> gasket::runtime::WorkResult { + let muxer = self.muxer.as_mut().unwrap(); + let demuxer = self.demuxer.as_mut().unwrap(); + + let span = tracing::span::Span::current(); + + let mut mux_res = None; + let mut demux_res = None; + + rayon::scope(|s| { + s.spawn(|_| { + let _guard = span.enter(); + info!("mux ticking"); + let outcome = muxer.tick(); + mux_res = Some(handle_mux_outcome(outcome)); + }); + s.spawn(|_| { + let _guard = span.enter(); + info!("demux ticking"); + let outcome = demuxer.tick(); + demux_res = Some(handle_demux_outcome(outcome)); + }); + }); + + mux_res.unwrap()?; + demux_res.unwrap()?; + + Ok(gasket::runtime::WorkOutcome::Partial) + } +} diff --git a/pallas-upstream/src/wellknown.rs b/pallas-upstream/src/wellknown.rs new file mode 100644 index 0000000..448c406 --- /dev/null +++ b/pallas-upstream/src/wellknown.rs @@ -0,0 +1,125 @@ +use serde::{Deserialize, Serialize}; + +use pallas_miniprotocols::{ + MAINNET_MAGIC, + TESTNET_MAGIC, + // PREVIEW_MAGIC, PRE_PRODUCTION_MAGIC, +}; + +use crate::framework::*; + +// TODO: use from pallas once available +pub const PRE_PRODUCTION_MAGIC: u64 = 1; +pub const PREVIEW_MAGIC: u64 = 2; + +/// Well-known information about specific networks +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct WellKnownChainInfo { + pub magic: u64, + pub byron_epoch_length: u32, + pub byron_slot_length: u32, + pub byron_known_slot: u64, + pub byron_known_hash: String, + pub byron_known_time: u64, + pub shelley_epoch_length: u32, + pub shelley_slot_length: u32, + pub shelley_known_slot: u64, + pub shelley_known_hash: String, + pub shelley_known_time: u64, +} + +impl WellKnownChainInfo { + /// Hardcoded values for mainnet + pub fn mainnet() -> Self { + WellKnownChainInfo { + magic: MAINNET_MAGIC, + byron_epoch_length: 432000, + byron_slot_length: 20, + byron_known_slot: 0, + byron_known_time: 1506203091, + byron_known_hash: "f0f7892b5c333cffc4b3c4344de48af4cc63f55e44936196f365a9ef2244134f" + .to_string(), + shelley_epoch_length: 432000, + shelley_slot_length: 1, + shelley_known_slot: 4492800, + shelley_known_hash: "aa83acbf5904c0edfe4d79b3689d3d00fcfc553cf360fd2229b98d464c28e9de" + .to_string(), + shelley_known_time: 1596059091, + } + } + + /// Hardcoded values for testnet + pub fn testnet() -> Self { + WellKnownChainInfo { + magic: TESTNET_MAGIC, + byron_epoch_length: 432000, + byron_slot_length: 20, + byron_known_slot: 0, + byron_known_time: 1564010416, + byron_known_hash: "8f8602837f7c6f8b8867dd1cbc1842cf51a27eaed2c70ef48325d00f8efb320f" + .to_string(), + shelley_epoch_length: 432000, + shelley_slot_length: 1, + shelley_known_slot: 1598400, + shelley_known_hash: "02b1c561715da9e540411123a6135ee319b02f60b9a11a603d3305556c04329f" + .to_string(), + shelley_known_time: 1595967616, + } + } + + pub fn preview() -> Self { + WellKnownChainInfo { + magic: PREVIEW_MAGIC, + byron_epoch_length: 432000, + byron_slot_length: 20, + byron_known_slot: 0, + byron_known_hash: "".to_string(), + byron_known_time: 1660003200, + shelley_epoch_length: 432000, + shelley_slot_length: 1, + shelley_known_slot: 25260, + shelley_known_hash: "cac921895ef5f2e85f7e6e6b51b663ab81b3605cd47d6b6d66e8e785e5c65011" + .to_string(), + shelley_known_time: 1660003200, + } + } + + /// Hardcoded values for the "pre-prod" testnet + pub fn preprod() -> Self { + WellKnownChainInfo { + magic: PRE_PRODUCTION_MAGIC, + byron_epoch_length: 432000, + byron_slot_length: 20, + byron_known_slot: 0, + byron_known_hash: "9ad7ff320c9cf74e0f5ee78d22a85ce42bb0a487d0506bf60cfb5a91ea4497d2" + .to_string(), + byron_known_time: 1654041600, + shelley_epoch_length: 432000, + shelley_slot_length: 1, + shelley_known_slot: 86400, + shelley_known_hash: "c4a1595c5cc7a31eda9e544986fe9387af4e3491afe0ca9a80714f01951bbd5c" + .to_string(), + shelley_known_time: 1654041600, + } + } + + /// Uses the value of the magic to return either mainnet or testnet + /// hardcoded values. + pub fn try_from_magic(magic: u64) -> Result { + match magic { + MAINNET_MAGIC => Ok(Self::mainnet()), + TESTNET_MAGIC => Ok(Self::testnet()), + PREVIEW_MAGIC => Ok(Self::preview()), + PRE_PRODUCTION_MAGIC => Ok(Self::preprod()), + _ => Err(Error::message( + "can't infer well-known chain from specified magic", + )), + } + } +} + +impl Default for WellKnownChainInfo { + fn default() -> Self { + Self::mainnet() + } +} diff --git a/pallas/Cargo.toml b/pallas/Cargo.toml index d12330a..002351d 100644 --- a/pallas/Cargo.toml +++ b/pallas/Cargo.toml @@ -18,3 +18,4 @@ pallas-traverse = { version = "0.18.0", path = "../pallas-traverse/" } pallas-addresses = { version = "0.18.0", path = "../pallas-addresses/" } pallas-crypto = { version = "0.18.0", path = "../pallas-crypto/" } pallas-codec = { version = "0.18.0", path = "../pallas-codec/" } +pallas-upstream = { version = "0.18.0", path = "../pallas-upstream/" } diff --git a/pallas/src/network.rs b/pallas/src/network.rs index 18fd21c..696884d 100644 --- a/pallas/src/network.rs +++ b/pallas/src/network.rs @@ -5,3 +5,6 @@ pub use pallas_multiplexer as multiplexer; #[doc(inline)] pub use pallas_miniprotocols as miniprotocols; + +#[doc(inline)] +pub use pallas_upstream as upstream;