diff --git a/Cargo.toml b/Cargo.toml index 3f93671..063ff1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,6 @@ members = [ "pallas-crypto", "pallas-primitives", "pallas-traverse", - "pallas-upstream", "pallas", "examples/block-download", "examples/block-decode", diff --git a/pallas-upstream/Cargo.toml b/pallas-upstream/Cargo.toml deleted file mode 100644 index 394550d..0000000 --- a/pallas-upstream/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "pallas-upstream" -description = "Opinionated implementation of component that pulls chain data from an upstream node" -version = "0.19.0-alpha.0" -edition = "2021" -repository = "https://github.com/txpipe/pallas" -homepage = "https://github.com/txpipe/pallas" -documentation = "https://docs.rs/pallas-upstream" -license = "Apache-2.0" -readme = "README.md" -authors = ["Santiago Carmuega "] - -[dependencies] -async-trait = "0.1.68" -byteorder = "1.4.3" -hex = "0.4.3" -pallas-codec = { version = "0.19.0-alpha.0", path = "../pallas-codec" } -pallas-crypto = { version = "0.19.0-alpha.0", path = "../pallas-crypto" } -pallas-network = { version = "0.19.0-alpha.0", path = "../pallas-network" } -pallas-traverse = { version = "0.19.0-alpha.0", path = "../pallas-traverse" } -serde = { version = "1.0.154", features = ["derive"] } -thiserror = "1.0.31" -tokio = { version = "1", features = ["net", "macros", "io-util"] } -tracing = "0.1.37" - -gasket = "^0.4" -# gasket = { git = "https://github.com/construkts/gasket-rs" } -# gasket = { path = "../../../construkts/gasket-rs/gasket" } - -[dev-dependencies] -tracing-subscriber = "0.3.16" diff --git a/pallas-upstream/README.md b/pallas-upstream/README.md deleted file mode 100644 index 4e77a32..0000000 --- a/pallas-upstream/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Pallas Upstream - -An opinionated implementation of component that pulls chain data from an upstream node. It's implemented as a pipeline using SEDA (staged event-driven architecture), heavily coupled with the [Gasket](https://github.com/construkts/gasket-rs) framework. \ No newline at end of file diff --git a/pallas-upstream/src/framework.rs b/pallas-upstream/src/framework.rs deleted file mode 100644 index a81d229..0000000 --- a/pallas-upstream/src/framework.rs +++ /dev/null @@ -1,26 +0,0 @@ -use pallas_crypto::hash::Hash; -use pallas_network::miniprotocols::Point; - -pub type BlockSlot = u64; -pub type BlockHash = Hash<32>; -pub type RawBlock = Vec; - -#[derive(Clone)] -pub enum Intersection { - Tip, - Origin, - Breadcrumbs(Vec), -} - -pub trait Cursor: Send + Sync { - fn intersection(&self) -> Intersection; -} - -#[derive(Debug, Clone)] -pub enum UpstreamEvent { - RollForward(BlockSlot, BlockHash, RawBlock), - Rollback(Point), -} - -// final output port -pub type DownstreamPort = gasket::messaging::OutputPort; diff --git a/pallas-upstream/src/lib.rs b/pallas-upstream/src/lib.rs deleted file mode 100644 index e2d9457..0000000 --- a/pallas-upstream/src/lib.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub(crate) mod framework; -pub(crate) mod worker; - -pub use crate::framework::{Cursor, DownstreamPort, Intersection, UpstreamEvent}; - -pub mod n2n { - pub use crate::worker::*; -} diff --git a/pallas-upstream/src/worker.rs b/pallas-upstream/src/worker.rs deleted file mode 100644 index 7f2b8f8..0000000 --- a/pallas-upstream/src/worker.rs +++ /dev/null @@ -1,236 +0,0 @@ -use std::marker::PhantomData; - -use gasket::framework::*; -use gasket::messaging::*; -use tracing::{debug, info}; - -use pallas_network::facades::PeerClient; -use pallas_network::miniprotocols::chainsync::{self, HeaderContent, NextResponse}; -use pallas_network::miniprotocols::Point; -use pallas_traverse::MultiEraHeader; - -use crate::framework::*; - -fn to_traverse(header: &HeaderContent) -> Result, WorkerError> { - let out = match header.byron_prefix { - Some((subtag, _)) => MultiEraHeader::decode(header.variant, Some(subtag), &header.cbor), - None => MultiEraHeader::decode(header.variant, None, &header.cbor), - }; - - out.or_panic() -} - -pub type DownstreamPort = gasket::messaging::OutputPort; - -async fn intersect(peer: &mut PeerClient, intersection: Intersection) -> Result<(), WorkerError> { - let chainsync = peer.chainsync(); - - let intersect = match intersection { - Intersection::Origin => { - info!("intersecting origin"); - chainsync.intersect_origin().await.or_restart()?.into() - } - Intersection::Tip => { - info!("intersecting tip"); - chainsync.intersect_tip().await.or_restart()?.into() - } - Intersection::Breadcrumbs(points) => { - info!("intersecting breadcrumbs"); - let (point, _) = chainsync.find_intersect(points).await.or_restart()?; - point - } - }; - - info!(?intersect, "intersected"); - - Ok(()) -} - -pub struct Worker -where - C: Cursor, - A: SendAdapter, -{ - peer_session: PeerClient, - _panthom_c: PhantomData, - _panthom_a: PhantomData, -} - -impl Worker -where - C: Cursor, - A: SendAdapter, -{ - async fn process_next( - &mut self, - stage: &mut Stage, - next: &NextResponse, - ) -> Result<(), WorkerError> { - match next { - NextResponse::RollForward(header, tip) => { - let header = to_traverse(header).or_panic()?; - let slot = header.slot(); - let hash = header.hash(); - - debug!(slot, %hash, "chain sync roll forward"); - - let block = self - .peer_session - .blockfetch() - .fetch_single(pallas_network::miniprotocols::Point::Specific( - slot, - hash.to_vec(), - )) - .await - .or_retry()?; - - stage - .downstream - .send(UpstreamEvent::RollForward(slot, hash, block).into()) - .await - .or_panic()?; - - stage.chain_tip.set(tip.0.slot_or_default() as i64); - - Ok(()) - } - chainsync::NextResponse::RollBackward(point, tip) => { - match &point { - Point::Origin => debug!("rollback to origin"), - Point::Specific(slot, _) => debug!(slot, "rollback"), - }; - - stage - .downstream - .send(UpstreamEvent::Rollback(point.clone()).into()) - .await - .or_panic()?; - - stage.chain_tip.set(tip.0.slot_or_default() as i64); - - Ok(()) - } - chainsync::NextResponse::Await => { - info!("chain-sync reached the tip of the chain"); - Ok(()) - } - } - } -} - -#[async_trait::async_trait(?Send)] -impl gasket::framework::Worker> for Worker -where - C: Cursor + Sync + Send, - A: SendAdapter, -{ - async fn bootstrap(stage: &Stage) -> Result { - debug!("connecting"); - - let intersection = stage.chain_cursor.intersection(); - - let mut peer_session = PeerClient::connect(&stage.peer_address, stage.network_magic) - .await - .or_retry()?; - - intersect(&mut peer_session, intersection).await?; - - let worker = Self { - peer_session, - _panthom_a: Default::default(), - _panthom_c: Default::default(), - }; - - Ok(worker) - } - - async fn schedule( - &mut self, - _stage: &mut Stage, - ) -> Result>, WorkerError> { - let client = self.peer_session.chainsync(); - - let next = match client.has_agency() { - true => { - info!("requesting next block"); - client.request_next().await.or_restart()? - } - false => { - info!("awaiting next block (blocking)"); - client.recv_while_must_reply().await.or_restart()? - } - }; - - Ok(WorkSchedule::Unit(next)) - } - - async fn execute( - &mut self, - unit: &NextResponse, - stage: &mut Stage, - ) -> Result<(), WorkerError> { - self.process_next(stage, unit).await - } - - async fn teardown(&mut self) -> Result<(), WorkerError> { - self.peer_session.abort(); - - Ok(()) - } -} - -pub struct Stage -where - C: Cursor, - A: SendAdapter, -{ - peer_address: String, - network_magic: u64, - chain_cursor: C, - downstream: DownstreamPort, - block_count: gasket::metrics::Counter, - chain_tip: gasket::metrics::Gauge, -} - -impl gasket::framework::Stage for Stage -where - C: Cursor, - A: SendAdapter, -{ - type Unit = NextResponse; - type Worker = Worker; - - fn name(&self) -> &str { - "upstream" - } - - fn metrics(&self) -> gasket::metrics::Registry { - let mut registry = gasket::metrics::Registry::default(); - - registry.track_counter("received_blocks", &self.block_count); - registry.track_gauge("chain_tip", &self.chain_tip); - - registry - } -} - -impl Stage -where - C: Cursor, - A: SendAdapter, -{ - pub fn new(peer_address: String, network_magic: u64, chain_cursor: C) -> Self { - Self { - peer_address, - network_magic, - chain_cursor, - downstream: Default::default(), - block_count: Default::default(), - chain_tip: Default::default(), - } - } - - pub fn downstream_port(&mut self) -> &mut DownstreamPort { - &mut self.downstream - } -} diff --git a/pallas-upstream/tests/integration.rs b/pallas-upstream/tests/integration.rs deleted file mode 100644 index 3429ee2..0000000 --- a/pallas-upstream/tests/integration.rs +++ /dev/null @@ -1,87 +0,0 @@ -use gasket::{framework::*, messaging::*, runtime::Policy}; - -use pallas_upstream::{Cursor, UpstreamEvent}; -use tracing::{error, info}; - -struct WitnessStage { - input: gasket::messaging::tokio::InputPort, -} - -impl gasket::framework::Stage for WitnessStage { - type Unit = UpstreamEvent; - type Worker = WitnessWorker; - - fn name(&self) -> &str { - "witness" - } -} - -struct WitnessWorker; - -#[async_trait::async_trait(?Send)] -impl Worker for WitnessWorker { - async fn bootstrap(_: &WitnessStage) -> Result { - Ok(Self) - } - - async fn schedule( - &mut self, - stage: &mut WitnessStage, - ) -> Result, WorkerError> { - error!("dequeing form witness"); - let msg = stage.input.recv().await.or_panic()?; - Ok(WorkSchedule::Unit(msg.payload)) - } - - async fn execute( - &mut self, - _: &UpstreamEvent, - _: &mut WitnessStage, - ) -> Result<(), WorkerError> { - info!("witnessing block event"); - - Ok(()) - } -} - -struct StaticCursor; - -impl Cursor for StaticCursor { - fn intersection(&self) -> pallas_upstream::Intersection { - pallas_upstream::Intersection::Origin - } -} - -#[test] -#[ignore] -fn test_mainnet_upstream() { - tracing::subscriber::set_global_default( - tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::TRACE) - .finish(), - ) - .unwrap(); - - let (send, receive) = gasket::messaging::tokio::channel(200); - - let mut upstream = pallas_upstream::n2n::Stage::new( - "relays-new.cardano-mainnet.iohk.io:3001".into(), - 764824073, - StaticCursor, - ); - - upstream.downstream_port().connect(send); - - let mut witness = WitnessStage { - input: Default::default(), - }; - - witness.input.connect(receive); - - let upstream = gasket::runtime::spawn_stage(upstream, Policy::default()); - let witness = gasket::runtime::spawn_stage(witness, Policy::default()); - - let daemon = gasket::daemon::Daemon(vec![upstream, witness]); - - daemon.block(); -} diff --git a/pallas/Cargo.toml b/pallas/Cargo.toml index 14a2c1f..9eb5d44 100644 --- a/pallas/Cargo.toml +++ b/pallas/Cargo.toml @@ -17,4 +17,3 @@ pallas-traverse = { version = "0.19.0-alpha.0", path = "../pallas-traverse/" } pallas-addresses = { version = "0.19.0-alpha.0", path = "../pallas-addresses/" } pallas-crypto = { version = "0.19.0-alpha.0", path = "../pallas-crypto/" } pallas-codec = { version = "0.19.0-alpha.0", path = "../pallas-codec/" } -pallas-upstream = { version = "0.19.0-alpha.0", path = "../pallas-upstream/" } diff --git a/pallas/src/lib.rs b/pallas/src/lib.rs index de88328..84bfa16 100644 --- a/pallas/src/lib.rs +++ b/pallas/src/lib.rs @@ -30,6 +30,3 @@ pub use pallas_crypto as crypto; #[doc(inline)] pub use pallas_codec as codec; - -#[doc(inline)] -pub use pallas_upstream as upstream;