chore: undo upstream crate experiment (#258)

This commit is contained in:
Santiago Carmuega 2023-05-20 20:20:01 +02:00 committed by GitHub
parent 36df92f8ad
commit 82a9eeb47d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 0 additions and 396 deletions

View file

@ -7,7 +7,6 @@ members = [
"pallas-crypto",
"pallas-primitives",
"pallas-traverse",
"pallas-upstream",
"pallas",
"examples/block-download",
"examples/block-decode",

View file

@ -1,31 +0,0 @@
[package]
name = "pallas-upstream"
description = "Opinionated implementation of component that pulls chain data from an upstream node"
version = "0.19.0-alpha.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
homepage = "https://github.com/txpipe/pallas"
documentation = "https://docs.rs/pallas-upstream"
license = "Apache-2.0"
readme = "README.md"
authors = ["Santiago Carmuega <santiago@carmuega.me>"]
[dependencies]
async-trait = "0.1.68"
byteorder = "1.4.3"
hex = "0.4.3"
pallas-codec = { version = "0.19.0-alpha.0", path = "../pallas-codec" }
pallas-crypto = { version = "0.19.0-alpha.0", path = "../pallas-crypto" }
pallas-network = { version = "0.19.0-alpha.0", path = "../pallas-network" }
pallas-traverse = { version = "0.19.0-alpha.0", path = "../pallas-traverse" }
serde = { version = "1.0.154", features = ["derive"] }
thiserror = "1.0.31"
tokio = { version = "1", features = ["net", "macros", "io-util"] }
tracing = "0.1.37"
gasket = "^0.4"
# gasket = { git = "https://github.com/construkts/gasket-rs" }
# gasket = { path = "../../../construkts/gasket-rs/gasket" }
[dev-dependencies]
tracing-subscriber = "0.3.16"

View file

@ -1,3 +0,0 @@
# Pallas Upstream
An opinionated implementation of component that pulls chain data from an upstream node. It's implemented as a pipeline using SEDA (staged event-driven architecture), heavily coupled with the [Gasket](https://github.com/construkts/gasket-rs) framework.

View file

@ -1,26 +0,0 @@
use pallas_crypto::hash::Hash;
use pallas_network::miniprotocols::Point;
pub type BlockSlot = u64;
pub type BlockHash = Hash<32>;
pub type RawBlock = Vec<u8>;
#[derive(Clone)]
pub enum Intersection {
Tip,
Origin,
Breadcrumbs(Vec<Point>),
}
pub trait Cursor: Send + Sync {
fn intersection(&self) -> Intersection;
}
#[derive(Debug, Clone)]
pub enum UpstreamEvent {
RollForward(BlockSlot, BlockHash, RawBlock),
Rollback(Point),
}
// final output port
pub type DownstreamPort<A> = gasket::messaging::OutputPort<A, UpstreamEvent>;

View file

@ -1,8 +0,0 @@
pub(crate) mod framework;
pub(crate) mod worker;
pub use crate::framework::{Cursor, DownstreamPort, Intersection, UpstreamEvent};
pub mod n2n {
pub use crate::worker::*;
}

View file

@ -1,236 +0,0 @@
use std::marker::PhantomData;
use gasket::framework::*;
use gasket::messaging::*;
use tracing::{debug, info};
use pallas_network::facades::PeerClient;
use pallas_network::miniprotocols::chainsync::{self, HeaderContent, NextResponse};
use pallas_network::miniprotocols::Point;
use pallas_traverse::MultiEraHeader;
use crate::framework::*;
fn to_traverse(header: &HeaderContent) -> Result<MultiEraHeader<'_>, WorkerError> {
let out = match header.byron_prefix {
Some((subtag, _)) => MultiEraHeader::decode(header.variant, Some(subtag), &header.cbor),
None => MultiEraHeader::decode(header.variant, None, &header.cbor),
};
out.or_panic()
}
pub type DownstreamPort<A> = gasket::messaging::OutputPort<A, UpstreamEvent>;
async fn intersect(peer: &mut PeerClient, intersection: Intersection) -> Result<(), WorkerError> {
let chainsync = peer.chainsync();
let intersect = match intersection {
Intersection::Origin => {
info!("intersecting origin");
chainsync.intersect_origin().await.or_restart()?.into()
}
Intersection::Tip => {
info!("intersecting tip");
chainsync.intersect_tip().await.or_restart()?.into()
}
Intersection::Breadcrumbs(points) => {
info!("intersecting breadcrumbs");
let (point, _) = chainsync.find_intersect(points).await.or_restart()?;
point
}
};
info!(?intersect, "intersected");
Ok(())
}
pub struct Worker<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
peer_session: PeerClient,
_panthom_c: PhantomData<C>,
_panthom_a: PhantomData<A>,
}
impl<C, A> Worker<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
async fn process_next(
&mut self,
stage: &mut Stage<C, A>,
next: &NextResponse<HeaderContent>,
) -> Result<(), WorkerError> {
match next {
NextResponse::RollForward(header, tip) => {
let header = to_traverse(header).or_panic()?;
let slot = header.slot();
let hash = header.hash();
debug!(slot, %hash, "chain sync roll forward");
let block = self
.peer_session
.blockfetch()
.fetch_single(pallas_network::miniprotocols::Point::Specific(
slot,
hash.to_vec(),
))
.await
.or_retry()?;
stage
.downstream
.send(UpstreamEvent::RollForward(slot, hash, block).into())
.await
.or_panic()?;
stage.chain_tip.set(tip.0.slot_or_default() as i64);
Ok(())
}
chainsync::NextResponse::RollBackward(point, tip) => {
match &point {
Point::Origin => debug!("rollback to origin"),
Point::Specific(slot, _) => debug!(slot, "rollback"),
};
stage
.downstream
.send(UpstreamEvent::Rollback(point.clone()).into())
.await
.or_panic()?;
stage.chain_tip.set(tip.0.slot_or_default() as i64);
Ok(())
}
chainsync::NextResponse::Await => {
info!("chain-sync reached the tip of the chain");
Ok(())
}
}
}
}
#[async_trait::async_trait(?Send)]
impl<C, A> gasket::framework::Worker<Stage<C, A>> for Worker<C, A>
where
C: Cursor + Sync + Send,
A: SendAdapter<UpstreamEvent>,
{
async fn bootstrap(stage: &Stage<C, A>) -> Result<Self, WorkerError> {
debug!("connecting");
let intersection = stage.chain_cursor.intersection();
let mut peer_session = PeerClient::connect(&stage.peer_address, stage.network_magic)
.await
.or_retry()?;
intersect(&mut peer_session, intersection).await?;
let worker = Self {
peer_session,
_panthom_a: Default::default(),
_panthom_c: Default::default(),
};
Ok(worker)
}
async fn schedule(
&mut self,
_stage: &mut Stage<C, A>,
) -> Result<WorkSchedule<NextResponse<HeaderContent>>, WorkerError> {
let client = self.peer_session.chainsync();
let next = match client.has_agency() {
true => {
info!("requesting next block");
client.request_next().await.or_restart()?
}
false => {
info!("awaiting next block (blocking)");
client.recv_while_must_reply().await.or_restart()?
}
};
Ok(WorkSchedule::Unit(next))
}
async fn execute(
&mut self,
unit: &NextResponse<HeaderContent>,
stage: &mut Stage<C, A>,
) -> Result<(), WorkerError> {
self.process_next(stage, unit).await
}
async fn teardown(&mut self) -> Result<(), WorkerError> {
self.peer_session.abort();
Ok(())
}
}
pub struct Stage<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
peer_address: String,
network_magic: u64,
chain_cursor: C,
downstream: DownstreamPort<A>,
block_count: gasket::metrics::Counter,
chain_tip: gasket::metrics::Gauge,
}
impl<C, A> gasket::framework::Stage for Stage<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
type Unit = NextResponse<HeaderContent>;
type Worker = Worker<C, A>;
fn name(&self) -> &str {
"upstream"
}
fn metrics(&self) -> gasket::metrics::Registry {
let mut registry = gasket::metrics::Registry::default();
registry.track_counter("received_blocks", &self.block_count);
registry.track_gauge("chain_tip", &self.chain_tip);
registry
}
}
impl<C, A> Stage<C, A>
where
C: Cursor,
A: SendAdapter<UpstreamEvent>,
{
pub fn new(peer_address: String, network_magic: u64, chain_cursor: C) -> Self {
Self {
peer_address,
network_magic,
chain_cursor,
downstream: Default::default(),
block_count: Default::default(),
chain_tip: Default::default(),
}
}
pub fn downstream_port(&mut self) -> &mut DownstreamPort<A> {
&mut self.downstream
}
}

View file

@ -1,87 +0,0 @@
use gasket::{framework::*, messaging::*, runtime::Policy};
use pallas_upstream::{Cursor, UpstreamEvent};
use tracing::{error, info};
struct WitnessStage {
input: gasket::messaging::tokio::InputPort<UpstreamEvent>,
}
impl gasket::framework::Stage for WitnessStage {
type Unit = UpstreamEvent;
type Worker = WitnessWorker;
fn name(&self) -> &str {
"witness"
}
}
struct WitnessWorker;
#[async_trait::async_trait(?Send)]
impl Worker<WitnessStage> for WitnessWorker {
async fn bootstrap(_: &WitnessStage) -> Result<Self, WorkerError> {
Ok(Self)
}
async fn schedule(
&mut self,
stage: &mut WitnessStage,
) -> Result<WorkSchedule<UpstreamEvent>, WorkerError> {
error!("dequeing form witness");
let msg = stage.input.recv().await.or_panic()?;
Ok(WorkSchedule::Unit(msg.payload))
}
async fn execute(
&mut self,
_: &UpstreamEvent,
_: &mut WitnessStage,
) -> Result<(), WorkerError> {
info!("witnessing block event");
Ok(())
}
}
struct StaticCursor;
impl Cursor for StaticCursor {
fn intersection(&self) -> pallas_upstream::Intersection {
pallas_upstream::Intersection::Origin
}
}
#[test]
#[ignore]
fn test_mainnet_upstream() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();
let (send, receive) = gasket::messaging::tokio::channel(200);
let mut upstream = pallas_upstream::n2n::Stage::new(
"relays-new.cardano-mainnet.iohk.io:3001".into(),
764824073,
StaticCursor,
);
upstream.downstream_port().connect(send);
let mut witness = WitnessStage {
input: Default::default(),
};
witness.input.connect(receive);
let upstream = gasket::runtime::spawn_stage(upstream, Policy::default());
let witness = gasket::runtime::spawn_stage(witness, Policy::default());
let daemon = gasket::daemon::Daemon(vec![upstream, witness]);
daemon.block();
}

View file

@ -17,4 +17,3 @@ pallas-traverse = { version = "0.19.0-alpha.0", path = "../pallas-traverse/" }
pallas-addresses = { version = "0.19.0-alpha.0", path = "../pallas-addresses/" }
pallas-crypto = { version = "0.19.0-alpha.0", path = "../pallas-crypto/" }
pallas-codec = { version = "0.19.0-alpha.0", path = "../pallas-codec/" }
pallas-upstream = { version = "0.19.0-alpha.0", path = "../pallas-upstream/" }

View file

@ -30,6 +30,3 @@ pub use pallas_crypto as crypto;
#[doc(inline)]
pub use pallas_codec as codec;
#[doc(inline)]
pub use pallas_upstream as upstream;