refactor: Merge multiplexer & miniprotocols into single crate (#244)

This commit is contained in:
Santiago Carmuega 2023-04-11 00:51:38 +02:00 committed by GitHub
parent b8ff4e9418
commit cb0348b47a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
80 changed files with 1694 additions and 3002 deletions

View file

@ -3,8 +3,7 @@
members = [
"pallas-codec",
"pallas-addresses",
"pallas-multiplexer",
"pallas-miniprotocols",
"pallas-network",
"pallas-crypto",
"pallas-primitives",
"pallas-traverse",

View file

@ -10,3 +10,6 @@ publish = false
pallas = { path = "../../pallas" }
net2 = "0.2.37"
hex = "0.4.3"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
tokio = { version = "1.27.0", features = ["rt-multi-thread"] }

View file

@ -1,38 +1,27 @@
use pallas::network::{
miniprotocols::{
blockfetch,
handshake::{self, n2n::VersionTable},
Point, MAINNET_MAGIC, PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_HANDSHAKE,
},
multiplexer::{bearers::Bearer, StdPlexer},
facades::PeerClient,
miniprotocols::{Point, MAINNET_MAGIC},
};
fn main() {
env_logger::init();
#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();
let bearer = Bearer::connect_tcp("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
let mut plexer = StdPlexer::new(bearer);
let handshake = plexer.use_client_channel(PROTOCOL_N2N_HANDSHAKE);
let blockfetch = plexer.use_client_channel(PROTOCOL_N2N_BLOCK_FETCH);
plexer.muxer.spawn();
plexer.demuxer.spawn();
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let mut hs_client = handshake::N2NClient::new(handshake);
let handshake = hs_client.handshake(versions).unwrap();
assert!(matches!(handshake, handshake::Confirmation::Accepted(..)));
let mut peer = PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC)
.await
.unwrap();
let point = Point::Specific(
49159253,
hex::decode("d034a2d0e4c3076f57368ed59319010c265718f0923057f8ff914a3b6bfd1314").unwrap(),
);
let mut bf_client = blockfetch::Client::new(blockfetch);
let block = bf_client.fetch_single(point).unwrap();
let block = peer.blockfetch().fetch_single(point).await.unwrap();
println!("downloaded block of size: {}", block.len());
println!("{}", hex::encode(&block));

View file

@ -11,3 +11,6 @@ pallas = { path = "../../pallas" }
net2 = "0.2.37"
hex = "0.4.3"
log = "0.4.16"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
tokio = { version = "1.27.0", features = ["rt-multi-thread"] }

View file

@ -1,56 +1,37 @@
use pallas::network::{
miniprotocols::{chainsync, handshake, localstate, Point, MAINNET_MAGIC},
multiplexer,
facades::NodeClient,
miniprotocols::{chainsync, localstate, Point, MAINNET_MAGIC},
};
use tracing::info;
#[derive(Debug)]
struct LoggingObserver;
#[allow(dead_code)]
fn do_handshake(channel: multiplexer::StdChannel) {
let mut client = handshake::N2CClient::new(channel);
let confirmation = client
.handshake(handshake::n2c::VersionTable::v1_and_above(MAINNET_MAGIC))
.unwrap();
match confirmation {
handshake::Confirmation::Accepted(v, _) => {
log::info!("hand-shake accepted, using version {}", v)
}
handshake::Confirmation::Rejected(x) => {
log::info!("hand-shake rejected with reason {:?}", x)
}
}
}
#[allow(dead_code)]
fn do_localstate_query(channel: multiplexer::StdChannel) {
let mut client = localstate::ClientV10::new(channel);
client.acquire(None).unwrap();
async fn do_localstate_query(client: &mut NodeClient) {
client.statequery().acquire(None).await.unwrap();
let result = client
.statequery()
.query(localstate::queries::RequestV10::GetSystemStart)
.await
.unwrap();
log::info!("system start result: {:?}", result);
info!("system start result: {:?}", result);
}
#[allow(dead_code)]
fn do_chainsync(channel: multiplexer::StdChannel) {
async fn do_chainsync(client: &mut NodeClient) {
let known_points = vec![Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let mut client = chainsync::N2CClient::new(channel);
let (point, _) = client
.chainsync()
.find_intersect(known_points)
.await
.unwrap();
let (point, _) = client.find_intersect(known_points).unwrap();
log::info!("intersected point is {:?}", point);
info!("intersected point is {:?}", point);
for _ in 0..10 {
let next = client.request_next().unwrap();
let next = client.chainsync().request_next().await.unwrap();
match next {
chainsync::NextResponse::RollForward(h, _) => {
@ -62,44 +43,29 @@ fn do_chainsync(channel: multiplexer::StdChannel) {
}
}
fn main() {
env_logger::builder()
.filter_level(log::LevelFilter::Trace)
.init();
#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();
#[cfg(not(target_family = "unix"))]
{
panic!("can't use n2c unix socket on non-unix systems");
}
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
#[cfg(target_family = "unix")]
{
use pallas::network::{
miniprotocols::{
PROTOCOL_N2C_CHAIN_SYNC, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY,
},
multiplexer::bearers::Bearer,
};
let bearer = Bearer::connect_unix("/tmp/node.socket").unwrap();
// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut plexer = multiplexer::StdPlexer::new(bearer);
let handshake = plexer.use_client_channel(PROTOCOL_N2C_HANDSHAKE);
let statequery = plexer.use_client_channel(PROTOCOL_N2C_STATE_QUERY);
let chainsync = plexer.use_client_channel(PROTOCOL_N2C_CHAIN_SYNC);
plexer.muxer.spawn();
plexer.demuxer.spawn();
// execute the required handshake against the relay
do_handshake(handshake);
let mut client = NodeClient::connect("/tmp/node.socket", MAINNET_MAGIC)
.await
.unwrap();
// execute an arbitrary "Local State" query against the node
do_localstate_query(statequery);
do_localstate_query(&mut client).await;
// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(chainsync);
do_chainsync(&mut client).await;
}
#[cfg(not(target_family = "unix"))]
fn main() {
panic!("can't use n2c unix socket on non-unix systems");
}

View file

@ -11,3 +11,6 @@ pallas = { path = "../../pallas" }
net2 = "0.2.37"
hex = "0.4.3"
log = "0.4.16"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
tokio = { version = "1.27.0", features = ["rt-multi-thread"] }

View file

@ -1,32 +1,10 @@
use pallas::network::{
miniprotocols::{
blockfetch, chainsync, handshake, Point, MAINNET_MAGIC, PROTOCOL_N2N_BLOCK_FETCH,
PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE,
},
multiplexer::{bearers::Bearer, StdChannel, StdPlexer},
facades::PeerClient,
miniprotocols::{chainsync, Point, MAINNET_MAGIC},
};
use tracing::info;
#[derive(Debug)]
struct LoggingObserver;
fn do_handshake(channel: StdChannel) {
let mut client = handshake::N2NClient::new(channel);
let confirmation = client
.handshake(handshake::n2n::VersionTable::v7_and_above(MAINNET_MAGIC))
.unwrap();
match confirmation {
handshake::Confirmation::Accepted(v, _) => {
log::info!("hand-shake accepted, using version {}", v)
}
handshake::Confirmation::Rejected(x) => {
log::info!("hand-shake rejected with reason {:?}", x)
}
}
}
fn do_blockfetch(channel: StdChannel) {
async fn do_blockfetch(peer: &mut PeerClient) {
let range = (
Point::Specific(
43847831,
@ -40,29 +18,25 @@ fn do_blockfetch(channel: StdChannel) {
),
);
let mut client = blockfetch::Client::new(channel);
let blocks = client.fetch_range(range).unwrap();
let blocks = peer.blockfetch().fetch_range(range).await.unwrap();
for block in blocks {
log::info!("received block of size: {}", block.len());
info!("received block of size: {}", block.len());
}
}
fn do_chainsync(channel: StdChannel) {
async fn do_chainsync(peer: &mut PeerClient) {
let known_points = vec![Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let mut client = chainsync::N2NClient::new(channel);
let (point, _) = peer.chainsync().find_intersect(known_points).await.unwrap();
let (point, _) = client.find_intersect(known_points).unwrap();
log::info!("intersected point is {:?}", point);
info!("intersected point is {:?}", point);
for _ in 0..10 {
let next = client.request_next().unwrap();
let next = peer.chainsync().request_next().await.unwrap();
match next {
chainsync::NextResponse::RollForward(h, _) => {
@ -74,31 +48,24 @@ fn do_chainsync(channel: StdChannel) {
}
}
fn main() {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.init();
#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();
// setup a TCP socket to act as data bearer between our agents and the remote
// relay.
let bearer = Bearer::connect_tcp("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut plexer = StdPlexer::new(bearer);
let handshake = plexer.use_client_channel(PROTOCOL_N2N_HANDSHAKE);
let blockfetch = plexer.use_client_channel(PROTOCOL_N2N_BLOCK_FETCH);
let chainsync = plexer.use_client_channel(PROTOCOL_N2N_CHAIN_SYNC);
plexer.muxer.spawn();
plexer.demuxer.spawn();
// execute the required handshake against the relay
do_handshake(handshake);
let mut peer = PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC)
.await
.unwrap();
// fetch an arbitrary batch of block
do_blockfetch(blockfetch);
do_blockfetch(&mut peer).await;
// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(chainsync);
do_chainsync(&mut peer).await;
}

View file

@ -1,2 +0,0 @@
/target
Cargo.lock

View file

@ -1,25 +0,0 @@
[package]
name = "pallas-miniprotocols"
description = "Implementation of the Ouroboros network mini-protocols state-machines"
version = "0.18.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
homepage = "https://github.com/txpipe/pallas"
documentation = "https://docs.rs/pallas-machines"
license = "Apache-2.0"
readme = "README.md"
authors = [
"Santiago Carmuega <santiago@carmuega.me>",
"Pi Lanningham <pi.lanningham@gmail.com>"
]
[dependencies]
pallas-codec = { version = "0.18.0", path = "../pallas-codec/" }
pallas-multiplexer = { version = "0.18.0", path = "../pallas-multiplexer/" }
hex = "0.4.3"
itertools = "0.10.3"
thiserror = "1.0.31"
tracing = "0.1.37"
[dev-dependencies]
tokio = { version = "1.27.0", features = ["macros", "rt"] }

View file

@ -1,140 +0,0 @@
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError};
use std::{cell::Cell, fmt::Debug};
use thiserror::Error;
use tracing::trace;
#[derive(Debug, Error)]
pub enum MachineError {
#[error("invalid message for state [{0}]: {1}")]
InvalidMsgForState(String, String),
#[error("channel error communicating with multiplexer: {0}")]
ChannelError(ChannelError),
#[error("downstream error while processing business logic {0}")]
DownstreamError(Box<dyn std::error::Error>),
}
impl MachineError {
pub fn channel(err: ChannelError) -> Self {
Self::ChannelError(err)
}
pub fn downstream(err: Box<dyn std::error::Error>) -> Self {
Self::DownstreamError(err)
}
pub fn invalid_msg<A: Agent>(state: &A::State, msg: &A::Message) -> Self {
Self::InvalidMsgForState(format!("{state:?}"), format!("{msg:?}"))
}
}
pub type Transition<A> = Result<A, MachineError>;
pub trait Agent: Sized {
type Message: std::fmt::Debug;
type State: std::fmt::Debug;
fn state(&self) -> &Self::State;
fn is_done(&self) -> bool;
fn has_agency(&self) -> bool;
fn build_next(&self) -> Self::Message;
fn apply_start(self) -> Transition<Self>;
fn apply_outbound(self, msg: Self::Message) -> Transition<Self>;
fn apply_inbound(self, msg: Self::Message) -> Transition<Self>;
}
pub struct Runner<A, C>
where
A: Agent,
C: Channel,
{
agent: Cell<Option<A>>,
buffer: ChannelBuffer<C>,
}
impl<A, C> Runner<A, C>
where
A: Agent,
A::Message: Fragment + std::fmt::Debug,
C: Channel,
{
pub fn new(agent: A, channel: C) -> Self {
Self {
agent: Cell::new(Some(agent)),
buffer: ChannelBuffer::new(channel),
}
}
pub fn start(&mut self) -> Result<(), MachineError> {
let prev = self.agent.take().unwrap();
let next = prev.apply_start()?;
self.agent.set(Some(next));
Ok(())
}
pub async fn run_step(&mut self) -> Result<bool, MachineError> {
let prev = self.agent.take().unwrap();
let next = run_agent_step(prev, &mut self.buffer).await?;
let is_done = next.is_done();
self.agent.set(Some(next));
Ok(is_done)
}
pub async fn fulfill(mut self) -> Result<(), MachineError> {
self.start()?;
while self.run_step().await? {}
Ok(())
}
}
pub async fn run_agent_step<A, C>(agent: A, channel: &mut ChannelBuffer<C>) -> Transition<A>
where
A: Agent,
A::Message: Fragment + std::fmt::Debug,
C: Channel,
{
match agent.has_agency() {
true => {
let msg = agent.build_next();
trace!(?msg, "processing outbound msg");
channel
.send_msg_chunks(&msg)
.await
.map_err(MachineError::channel)?;
agent.apply_outbound(msg)
}
false => {
let msg = channel
.recv_full_msg()
.await
.map_err(MachineError::channel)?;
trace!(?msg, "processing inbound msg");
agent.apply_inbound(msg)
}
}
}
pub async fn run_agent<A, C>(agent: A, buffer: &mut ChannelBuffer<C>) -> Transition<A>
where
A: Agent,
A::Message: Fragment + std::fmt::Debug,
C: Channel,
{
let mut agent = agent.apply_start()?;
while !agent.is_done() {
agent = run_agent_step(agent, buffer).await?;
}
Ok(agent)
}

View file

@ -1,155 +0,0 @@
use super::{MempoolSizeAndCapacity, Message, MsgRequest, MsgResponse};
use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder};
impl Encode<()> for Message {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
Message::MsgDone => {
e.array(1)?.u16(0)?;
}
Message::MsgAcquire => {
e.array(1)?.u16(1)?;
}
Message::MsgAcquired(slot) => {
e.array(2)?.u16(2)?;
e.encode(slot)?;
}
Message::MsgQuery(query) => {
query.encode(e, ctx)?;
}
Message::MsgResponse(response) => {
response.encode(e, ctx)?;
}
}
Ok(())
}
}
impl<'b> Decode<'b, ()> for Message {
fn decode(
d: &mut pallas_codec::minicbor::Decoder<'b>,
_ctx: &mut (),
) -> Result<Self, decode::Error> {
d.array()?;
let label = d.u16()?;
match label {
0 => Ok(Message::MsgDone),
1 => Ok(Message::MsgAcquire),
2 => {
let slot = d.decode()?;
Ok(Message::MsgAcquired(slot))
}
3 => Ok(Message::MsgQuery(MsgRequest::MsgRelease)),
5 => Ok(Message::MsgQuery(MsgRequest::MsgNextTx)),
6 => {
d.array()?;
let tag: Result<u8, pallas_codec::minicbor::decode::Error> = d.u8();
let mut tx = None;
if tag.is_ok() {
d.tag()?;
let cbor = d.bytes()?;
tx = Some(hex::encode(cbor));
}
Ok(Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx)))
}
7 => {
let txid = d.decode()?;
Ok(Message::MsgQuery(MsgRequest::MsgHasTx(txid)))
}
8 => {
let has = d.decode()?;
Ok(Message::MsgResponse(MsgResponse::MsgReplyHasTx(has)))
}
9 => Ok(Message::MsgQuery(MsgRequest::MsgGetSizes)),
10 => {
d.array()?;
let capacity_in_bytes = d.decode()?;
let size_in_bytes = d.decode()?;
let number_of_txs = d.decode()?;
Ok(Message::MsgResponse(MsgResponse::MsgReplyGetSizes(
MempoolSizeAndCapacity {
capacity_in_bytes,
size_in_bytes,
number_of_txs,
},
)))
}
_ => Err(decode::Error::message("can't decode Message")),
}
}
fn nil() -> Option<Self> {
None
}
}
impl Encode<()> for MsgRequest {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
MsgRequest::MsgAwaitAcquire => {
e.array(1)?.u16(1)?;
}
MsgRequest::MsgGetSizes => {
e.array(1)?.u16(9)?;
}
MsgRequest::MsgHasTx(tx) => {
e.array(2)?.u16(7)?;
e.encode(tx)?;
}
MsgRequest::MsgNextTx => {
e.array(1)?.u16(5)?;
}
MsgRequest::MsgRelease => {
e.array(1)?.u16(3)?;
}
}
Ok(())
}
}
impl Encode<()> for MsgResponse {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
MsgResponse::MsgReplyGetSizes(sz) => {
e.array(2)?.u16(10)?;
e.array(3)?;
e.encode(sz.capacity_in_bytes)?;
e.encode(sz.size_in_bytes)?;
e.encode(sz.number_of_txs)?;
}
MsgResponse::MsgReplyHasTx(tx) => {
e.array(2)?.u16(8)?;
e.encode(tx)?;
}
MsgResponse::MsgReplyNextTx(None) => {
e.array(1)?.u16(6)?;
}
MsgResponse::MsgReplyNextTx(Some(tx)) => {
e.array(2)?.u16(6)?;
e.encode(tx.to_string())?;
}
}
Ok(())
}
fn is_nil(&self) -> bool {
false
}
}

View file

@ -1,213 +0,0 @@
mod codec;
use crate::machines::{Agent, MachineError, Transition};
use pallas_codec::Fragment;
use std::fmt::Debug;
type Slot = u64;
type TxId = String;
type Tx = String;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum StBusyKind {
NextTx,
HasTx,
GetSizes,
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
StIdle,
StAcquiring,
StAcquired,
StBusy(StBusyKind),
StDone,
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct MempoolSizeAndCapacity {
pub capacity_in_bytes: u32,
pub size_in_bytes: u32,
pub number_of_txs: u32,
}
#[derive(Debug, Clone)]
pub enum Message {
MsgAcquire,
MsgAcquired(Slot),
MsgQuery(MsgRequest),
MsgResponse(MsgResponse),
MsgDone,
}
#[derive(Debug, Clone)]
pub enum MsgRequest {
MsgAwaitAcquire,
MsgNextTx,
MsgHasTx(TxId),
MsgGetSizes,
MsgRelease,
}
#[derive(Debug, Clone)]
pub enum MsgResponse {
MsgReplyNextTx(Option<Tx>),
MsgReplyHasTx(bool),
MsgReplyGetSizes(MempoolSizeAndCapacity),
}
#[derive(Debug, Clone)]
pub struct LocalTxMonitor {
pub state: State,
pub snapshot: Option<Slot>,
pub request: Option<MsgRequest>,
pub output: Option<MsgResponse>,
}
impl LocalTxMonitor
where
Message: Fragment,
{
pub fn initial(state: State) -> Self {
Self {
state,
snapshot: None,
request: None,
output: None,
}
}
fn on_acquired(self, slot: Slot) -> Transition<Self> {
Ok(Self {
state: State::StAcquired,
snapshot: Some(slot),
output: None,
..self
})
}
fn on_reply_next_tx(self, tx: Option<Tx>) -> Transition<Self> {
Ok(Self {
output: Some(MsgResponse::MsgReplyNextTx(tx)),
..self
})
}
fn on_reply_has_tx(self, arg: bool) -> Transition<Self> {
Ok(Self {
output: Some(MsgResponse::MsgReplyHasTx(arg)),
..self
})
}
fn on_reply_get_size(self, status: MempoolSizeAndCapacity) -> Transition<Self> {
Ok(Self {
output: Some(MsgResponse::MsgReplyGetSizes(status)),
..self
})
}
}
impl Agent for LocalTxMonitor
where
Message: Fragment,
{
type Message = Message;
type State = State;
fn state(&self) -> &Self::State {
&self.state
}
fn is_done(&self) -> bool {
self.state == State::StDone
}
fn has_agency(&self) -> bool {
match &self.state {
State::StIdle => true,
State::StAcquiring => false,
State::StAcquired => true,
State::StBusy(..) => false,
State::StDone => false,
}
}
fn build_next(&self) -> Self::Message {
match (&self.state, &self.request, &self.output) {
(State::StIdle, None, None) => Message::MsgAcquire,
(State::StAcquired, None, None) => Message::MsgAcquire,
(State::StAcquired, Some(MsgRequest::MsgAwaitAcquire), None) => Message::MsgAcquire,
(State::StAcquired, Some(MsgRequest::MsgNextTx), None) => {
Message::MsgQuery(MsgRequest::MsgNextTx)
}
(State::StAcquired, Some(MsgRequest::MsgHasTx(tx)), None) => {
Message::MsgQuery(MsgRequest::MsgHasTx(tx.clone()))
}
(State::StAcquired, Some(MsgRequest::MsgGetSizes), None) => {
Message::MsgQuery(MsgRequest::MsgGetSizes)
}
(State::StAcquired, None, Some(_)) => Message::MsgAcquire,
(State::StAcquired, Some(req), Some(_)) => Message::MsgQuery(req.to_owned()),
_ => panic!("I do not have agency, don't know what to do"),
}
}
fn apply_start(self) -> Transition<Self> {
Ok(self)
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
match (self.state, msg) {
(State::StIdle, Message::MsgAcquire) => Ok(Self {
state: State::StAcquiring,
..self
}),
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgNextTx)) => Ok(Self {
state: State::StBusy(StBusyKind::NextTx),
..self
}),
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgHasTx(_))) => Ok(Self {
state: State::StBusy(StBusyKind::HasTx),
..self
}),
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgGetSizes)) => Ok(Self {
state: State::StBusy(StBusyKind::GetSizes),
..self
}),
(State::StAcquired, Message::MsgAcquire) => Ok(Self {
state: State::StAcquiring,
..self
}),
(State::StAcquired, Message::MsgQuery(MsgRequest::MsgRelease)) => Ok(Self {
state: State::StIdle,
..self
}),
(State::StIdle, Message::MsgDone) => Ok(Self {
state: State::StDone,
..self
}),
_ => panic!("PANIC! Cannot match outbound"),
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::StAcquiring, Message::MsgAcquired(s)) => self.on_acquired(s),
(
State::StBusy(StBusyKind::NextTx),
Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx)),
) => self.on_reply_next_tx(tx),
(
State::StBusy(StBusyKind::HasTx),
Message::MsgResponse(MsgResponse::MsgReplyHasTx(arg)),
) => self.on_reply_has_tx(arg),
(
State::StBusy(StBusyKind::GetSizes),
Message::MsgResponse(MsgResponse::MsgReplyGetSizes(msc)),
) => self.on_reply_get_size(msc),
(state, msg) => Err(MachineError::invalid_msg::<Self>(state, &msg)),
}
}
}

View file

@ -1,242 +0,0 @@
use pallas_miniprotocols::{
blockfetch,
chainsync::{self, NextResponse},
handshake::{self, Confirmation},
txsubmission::{self, EraTxId, Reply, TxIdAndSize},
Point, PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE,
PROTOCOL_N2N_TX_SUBMISSION,
};
use pallas_multiplexer::{bearers::Bearer, StdChannel, StdPlexer};
struct N2NChannels {
chainsync: StdChannel,
blockfetch: StdChannel,
txsubmission: StdChannel,
}
async fn setup_n2n_client_connection() -> N2NChannels {
let bearer = Bearer::connect_tcp("preview-node.world.dev.cardano.org:30002").unwrap();
let mut plexer = StdPlexer::new(bearer);
let handshake = plexer.use_channel(PROTOCOL_N2N_HANDSHAKE);
let chainsync = plexer.use_channel(PROTOCOL_N2N_CHAIN_SYNC);
let blockfetch = plexer.use_channel(PROTOCOL_N2N_BLOCK_FETCH);
let txsubmission = plexer.use_channel(PROTOCOL_N2N_TX_SUBMISSION);
plexer.muxer.spawn();
plexer.demuxer.spawn();
let mut client = handshake::N2NClient::new(handshake);
let confirmation = client
.handshake(handshake::n2n::VersionTable::v7_and_above(2))
.await
.unwrap();
assert!(matches!(confirmation, Confirmation::Accepted(..)));
if let Confirmation::Accepted(v, _) = confirmation {
assert!(v >= 7);
}
N2NChannels {
chainsync,
blockfetch,
txsubmission,
}
}
#[tokio::test]
#[ignore]
pub async fn chainsync_history_happy_path() {
let N2NChannels { chainsync, .. } = setup_n2n_client_connection().await;
let known_point = Point::Specific(
1654413,
hex::decode("7de1f036df5a133ce68a82877d14354d0ba6de7625ab918e75f3e2ecb29771c2").unwrap(),
);
let mut client = chainsync::N2NClient::new(chainsync);
let (point, _) = client
.find_intersect(vec![known_point.clone()])
.await
.unwrap();
assert!(matches!(client.state(), chainsync::State::Idle));
match point {
Some(point) => assert_eq!(point, known_point),
None => panic!("expected point"),
}
let next = client.request_next().await.unwrap();
match next {
NextResponse::RollBackward(point, _) => assert_eq!(point, known_point),
_ => panic!("expected rollback"),
}
assert!(matches!(client.state(), chainsync::State::Idle));
for _ in 0..10 {
let next = client.request_next().await.unwrap();
match next {
NextResponse::RollForward(_, _) => (),
_ => panic!("expected roll-forward"),
}
assert!(matches!(client.state(), chainsync::State::Idle));
}
client.send_done().await.unwrap();
assert!(matches!(client.state(), chainsync::State::Done));
}
#[tokio::test]
#[ignore]
pub async fn chainsync_tip_happy_path() {
let N2NChannels { chainsync, .. } = setup_n2n_client_connection().await;
let mut client = chainsync::N2NClient::new(chainsync);
client.intersect_tip().await.unwrap();
assert!(matches!(client.state(), chainsync::State::Idle));
let next = client.request_next().await.unwrap();
assert!(matches!(next, NextResponse::RollBackward(..)));
let mut await_count = 0;
for _ in 0..4 {
let next = if client.has_agency() {
client.request_next().await.unwrap()
} else {
await_count += 1;
client.recv_while_must_reply().await.unwrap()
};
match next {
NextResponse::RollForward(_, _) => (),
NextResponse::Await => (),
_ => panic!("expected roll-forward or await"),
}
}
assert!(await_count > 0, "tip was never reached");
client.send_done().await.unwrap();
assert!(matches!(client.state(), chainsync::State::Done));
}
#[tokio::test]
#[ignore]
pub async fn blockfetch_happy_path() {
let N2NChannels { blockfetch, .. } = setup_n2n_client_connection().await;
let known_point = Point::Specific(
1654413,
hex::decode("7de1f036df5a133ce68a82877d14354d0ba6de7625ab918e75f3e2ecb29771c2").unwrap(),
);
let mut client = blockfetch::Client::new(blockfetch);
let range_ok = client
.request_range((known_point.clone(), known_point))
.await;
assert!(matches!(client.state(), blockfetch::State::Streaming));
assert!(matches!(range_ok, Ok(_)));
for _ in 0..1 {
let next = client.recv_while_streaming().await.unwrap();
match next {
Some(body) => assert_eq!(body.len(), 3251),
_ => panic!("expected block body"),
}
assert!(matches!(client.state(), blockfetch::State::Streaming));
}
let next = client.recv_while_streaming().await.unwrap();
assert!(matches!(next, None));
client.send_done().await.unwrap();
assert!(matches!(client.state(), blockfetch::State::Done));
}
#[tokio::test]
#[ignore]
pub async fn txsubmission_server_happy_path() {
// TODO(pi): Note that the below doesn't work; we need a node to connect *to us*
// during the integration test which seems awkward;
// Alternatively, we can just set up both a client and server connecting to
// themselves for testing!
let N2NChannels { txsubmission, .. } = setup_n2n_client_connection().await;
let mut server = txsubmission::Server::new(txsubmission);
assert!(matches!(server.wait_for_init().await, Ok(_)));
assert!(matches!(
server.acknowledge_and_request_tx_ids(false, 0, 3).await,
Ok(_)
));
let reply: Result<_, _> = server.receive_next_reply().await;
assert!(matches!(reply, Ok(Reply::TxIds(_))));
let Ok(Reply::TxIds(tx_ids)) = reply else { unreachable!() };
assert!(tx_ids.len() <= 3);
assert!(matches!(
server
.request_txs(
tx_ids
.into_iter()
.map(|txid: TxIdAndSize<EraTxId>| txid.0)
.collect()
)
.await,
Ok(_)
));
let reply = server.receive_next_reply().await;
assert!(matches!(reply, Ok(Reply::Txs(_))));
let Ok(Reply::Txs(first_txs)) = reply else { unreachable!() };
assert!(matches!(
server.acknowledge_and_request_tx_ids(false, 1, 3).await,
Ok(_)
));
let reply = server.receive_next_reply().await;
assert!(matches!(reply, Ok(Reply::Txs(_))));
let Ok(Reply::Txs(second_txs)) = reply else { unreachable!() };
// Make sure we receive the second and third tx again, indicating we sent the
// `acknowledge 1` bit correctly
assert_eq!(second_txs[0], first_txs[1]);
assert_eq!(second_txs[1], first_txs[2]);
assert!(matches!(
server.acknowledge_and_request_tx_ids(true, 3, 3).await,
Ok(_)
));
match server.receive_next_reply().await {
Ok(Reply::Done) => (), // Server aint havin none of our sh*t
Ok(Reply::TxIds(tx_ids)) => assert_eq!(tx_ids.len(), 3),
Ok(_) | Err(_) => unreachable!(),
}
}

View file

@ -1,2 +0,0 @@
/target
Cargo.lock

View file

@ -1,28 +0,0 @@
[package]
name = "pallas-multiplexer"
description = "Multithreaded Ouroboros multiplexer implementation using mpsc channels"
version = "0.18.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
homepage = "https://github.com/txpipe/pallas"
documentation = "https://docs.rs/pallas-multiplexer"
license = "Apache-2.0"
readme = "README.md"
authors = ["Santiago Carmuega <santiago@carmuega.me>"]
[dependencies]
pallas-codec = { version = "0.18.0", path = "../pallas-codec/" }
log = "0.4.14"
byteorder = "1.4.3"
hex = "0.4.3"
rand = "0.8.4"
thiserror = "1.0.31"
tracing = "0.1.37"
[features]
std = []
sync = []
default = ["std", "sync"]
[dev-dependencies]
tokio = { version = "1.27.0", features = ["macros", "rt"] }

View file

@ -1,88 +0,0 @@
# Pallas Multiplexer
This is an implementation of the Ouroboros multiplexer logic as defined in the [The Shelley Networking Protocol](https://hydra.iohk.io/build/1070091/download/1/network.pdf#chapter.3) specs.
## Architectural Decisions
The following architectural decisions were made for this particular Rust implementation:
- each mini-protocol state machine should be able to work in its own thread
- a bounded queue should serve as buffer to decouple mini-protocol logic from multiplexer work
- the implementation should pipelining-friendly, even if we don't have a current use-case
- the multiplexer should be agnostic of the mini-protocols implementation details.
## Implementation Details
Given the above definitions, Rust's _mpsc channels_ seem like the correct artifact to orchestrate the communication between the different threads in the multiplexer process.
The following diagram provides an overview of the components involved:
![Multiplexer Diagram](docs/diagram.png)
## Usage
The following code provides a very rough example of how to setup a client that connects to a node and spawns two concurrent threads running independently, both communication over the same bearer using _Pallas_ multiplexer.
```rust
// Setup a new bearer. In this case, we use a unix socket to connect
// to a node running on the local machine.
let bearer = UnixStream::connect("/tmp/pallas").unwrap();
// Setup a new multiplexer using the created bearer and a specification
// of the mini-protocol IDs that we'll be using for our session. In this case, we
// pass id #0 (handshake) and #2 (chainsync).
let muxer = Multiplexer::setup(tcp, &[0, 2])
// Ask the multiplexer to provide us with the channel for the miniprotocol #0.
let mut handshake = muxer.use_client_channel(PROTOCOL_N2N_HANDSHAKE);
// Spawn a thread and pass the ownership of the channel.
thread::spawn(move || {
// Deconstruct the channel to get a handle for sending data into the muxer
// ingress and a handle to receive data from the demuxer egress.
let Channel(mux_tx, demux_rx) = handshake;
// Do something with the channel. In this case, we just keep sending
// dumb data every 50 millis.
loop {
let payload = vec![1; 65545];
tx.send(payload).unwrap();
thread::sleep(Duration::from_millis(50));
}
});
// Ask the multiplexer to provide us with the channel for the chainsync miniprotocol.
let mut chainsync = muxer.use_client_channel(PROTOCOL_N2N_CHAINSYNC);
// Spawn a different thread and pass the ownership of the 2nd channel.
thread::spawn(move || {
// Deconstruct the channel to get a handle for sending data into the muxer
// ingress and a handle to receive data from the demuxer egress.
let Channel(mux_tx, demux_rx) = chainsync;
// Do something with the channel. In this case, we just print in stdout
// whatever get received for this mini-protocol.
loop {
let payload = rx.recv().unwrap();
println!("id:{protocol}, length:{}", payload.len());
}
});
```
## Run Examples
For a working example of a two peers communicating (a sender and a listener), check the [examples folder](examples). To run the examples, open two different terminals and run a different peer in each one:
```sh
# on terminal 1, start the listener
RUST_LOG=info cargo run --example listener
```
```sh
# on terminal 2, start the sender
RUST_LOG=info cargo run --example sender
```
## Real World Usage
For a more complex, real-world example, check the [Oura](https://github.com/txpipe/oura) repo, it provides a full-blown client tool designed to live-stream block data from a local or remote node.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 158 KiB

View file

@ -1,180 +0,0 @@
//! Interface to interact with the multiplexer as an agent
use crate::Payload;
use pallas_codec::{minicbor, Fragment};
use thiserror::Error;
use tracing::{debug, error, trace};
#[derive(Debug, Error)]
pub enum ChannelError {
#[error("channel is not connected, failed to send payload")]
NotConnected(Option<Payload>),
#[error("failure encoding message into CBOR")]
Encoding(String),
#[error("failure decoding message from CBOR")]
Decoding(String),
}
/// A raw link to the ingress / egress of the multiplexer
pub trait Channel {
async fn enqueue_chunk(&mut self, chunk: Payload) -> Result<(), ChannelError>;
async fn dequeue_chunk(&mut self) -> Result<Payload, ChannelError>;
}
/// Protocol value that defines max segment length
pub const MAX_SEGMENT_PAYLOAD_LENGTH: usize = 65535;
fn try_decode_message<M>(buffer: &mut Vec<u8>) -> Result<Option<M>, ChannelError>
where
M: Fragment,
{
let mut decoder = minicbor::Decoder::new(buffer);
let maybe_msg = decoder.decode();
match maybe_msg {
Ok(msg) => {
let pos = decoder.position();
buffer.drain(0..pos);
Ok(Some(msg))
}
Err(err) if err.is_end_of_input() => Ok(None),
Err(err) => {
error!(?err);
error!("{}", hex::encode(buffer));
Err(ChannelError::Decoding(err.to_string()))
}
}
}
/// A channel abstraction to hide the complexity of partial payloads
pub struct ChannelBuffer<C: Channel> {
channel: C,
temp: Vec<u8>,
}
impl<C: Channel> ChannelBuffer<C> {
pub fn new(channel: C) -> Self {
Self {
channel,
temp: Vec::new(),
}
}
/// Enqueues a msg as a sequence payload chunks
pub async fn send_msg_chunks<M>(&mut self, msg: &M) -> Result<(), ChannelError>
where
M: Fragment,
{
let mut payload = Vec::new();
minicbor::encode(msg, &mut payload)
.map_err(|err| ChannelError::Encoding(err.to_string()))?;
let chunks = payload.chunks(MAX_SEGMENT_PAYLOAD_LENGTH);
for chunk in chunks {
self.channel.enqueue_chunk(Vec::from(chunk)).await?;
}
Ok(())
}
/// Reads from the channel until a complete message is found
pub async fn recv_full_msg<M>(&mut self) -> Result<M, ChannelError>
where
M: Fragment,
{
if !self.temp.is_empty() {
if let Some(msg) = try_decode_message::<M>(&mut self.temp)? {
debug!("decoding done");
return Ok(msg);
}
}
loop {
let chunk = self.channel.dequeue_chunk().await?;
self.temp.extend(chunk);
if let Some(msg) = try_decode_message::<M>(&mut self.temp)? {
debug!("decoding done");
return Ok(msg);
}
trace!("not enough data");
}
}
pub fn unwrap(self) -> C {
self.channel
}
}
impl<C: Channel> From<C> for ChannelBuffer<C> {
fn from(channel: C) -> Self {
ChannelBuffer::new(channel)
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use super::*;
impl Channel for VecDeque<Payload> {
async fn enqueue_chunk(&mut self, chunk: Payload) -> Result<(), ChannelError> {
self.push_back(chunk);
Ok(())
}
async fn dequeue_chunk(&mut self) -> Result<Payload, ChannelError> {
let chunk = self.pop_front().ok_or(ChannelError::NotConnected(None))?;
Ok(chunk)
}
}
#[tokio::test]
async fn multiple_messages_in_same_payload() {
let mut input = Vec::new();
let in_part1 = (1u8, 2u8, 3u8);
let in_part2 = (6u8, 5u8, 4u8);
minicbor::encode(in_part1, &mut input).unwrap();
minicbor::encode(in_part2, &mut input).unwrap();
let mut channel = VecDeque::<Payload>::new();
channel.push_back(input);
let mut buf = ChannelBuffer::new(channel);
let out_part1 = buf.recv_full_msg::<(u8, u8, u8)>().await.unwrap();
let out_part2 = buf.recv_full_msg::<(u8, u8, u8)>().await.unwrap();
assert_eq!(in_part1, out_part1);
assert_eq!(in_part2, out_part2);
}
#[tokio::test]
async fn fragmented_message_in_multiple_payloads() {
let mut input = Vec::new();
let msg = (11u8, 12u8, 13u8, 14u8, 15u8, 16u8, 17u8);
minicbor::encode(msg, &mut input).unwrap();
let mut channel = VecDeque::<Payload>::new();
while !input.is_empty() {
let chunk = Vec::from(input.drain(0..2).as_slice());
channel.push_back(chunk);
}
let mut buf = ChannelBuffer::new(channel);
let out_msg = buf
.recv_full_msg::<(u8, u8, u8, u8, u8, u8, u8)>()
.await
.unwrap();
assert_eq!(msg, out_msg);
}
}

View file

@ -1,187 +0,0 @@
use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt};
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, ToSocketAddrs};
use std::{net::TcpStream, time::Instant};
use tracing::{debug, event_enabled, trace};
use crate::Payload;
#[cfg(target_family = "unix")]
use std::os::unix::net::UnixStream;
use std::time::Duration;
pub struct Segment {
pub protocol: u16,
pub timestamp: u32,
pub payload: Payload,
}
impl Segment {
pub fn new(clock: Instant, protocol: u16, payload: Payload) -> Self {
Segment {
timestamp: clock.elapsed().as_micros() as u32,
protocol,
payload,
}
}
}
fn write_segment(writer: &mut impl Write, segment: Segment) -> Result<(), std::io::Error> {
let Segment {
timestamp,
protocol,
payload,
} = segment;
let mut msg = Vec::new();
msg.write_u32::<NetworkEndian>(timestamp)?;
msg.write_u16::<NetworkEndian>(protocol)?;
msg.write_u16::<NetworkEndian>(payload.len() as u16)?;
msg.write_all(&payload)?;
if event_enabled!(tracing::Level::TRACE) {
trace!(
protocol,
length = payload.len(),
message = hex::encode(&msg),
"writing segment"
);
}
writer.write_all(&msg)?;
writer.flush()
}
fn read_segment(reader: &mut impl Read) -> Result<Segment, std::io::Error> {
let mut header = [0u8; 8];
reader.read_exact(&mut header)?;
if event_enabled!(tracing::Level::TRACE) {
trace!(header = hex::encode(header), "segment header read");
}
let length = NetworkEndian::read_u16(&header[6..]) as usize;
let protocol = NetworkEndian::read_u16(&header[4..6]) as usize ^ 0x8000;
let timestamp = NetworkEndian::read_u32(&header[0..4]);
debug!(protocol, timestamp, length, "parsed inbound msg");
let mut payload = vec![0u8; length];
reader.read_exact(&mut payload)?;
if event_enabled!(tracing::Level::TRACE) {
trace!(payload = hex::encode(&payload), "segment payload read");
}
Ok(Segment {
protocol: protocol as u16,
timestamp,
payload,
})
}
// This snippet will be useful if we want to switch TCP streams into
// non-blocking mode, but that's not likely (if we want async, we'll probably go
// with Tokio instead of a handcrafted approach).
/*
fn read_segment_with_timeout(reader: &mut impl Read) -> Result<Option<Segment>, std::io::Error> {
match read_segment(reader) {
Ok(s) => Ok(Some(s)),
Err(err) => match err.kind() {
std::io::ErrorKind::WouldBlock => Ok(None),
std::io::ErrorKind::TimedOut => Ok(None),
std::io::ErrorKind::Interrupted => Ok(None),
_ => Err(err),
},
}
}
*/
#[derive(Debug)]
pub enum Bearer {
Tcp(TcpStream),
#[cfg(target_family = "unix")]
Unix(UnixStream),
}
impl Bearer {
pub fn connect_tcp<A: ToSocketAddrs>(addr: A) -> Result<Self, std::io::Error> {
let bearer = TcpStream::connect(addr)?;
bearer.set_nodelay(true)?;
Ok(Bearer::Tcp(bearer))
}
pub fn connect_tcp_timeout(
addr: &SocketAddr,
timeout: Duration,
) -> Result<Self, std::io::Error> {
let bearer = TcpStream::connect_timeout(addr, timeout)?;
bearer.set_nodelay(true)?;
Ok(Bearer::Tcp(bearer))
}
pub fn accept_tcp(server: TcpListener) -> Result<(Self, SocketAddr), std::io::Error> {
let (bearer, remote_addr) = server.accept().unwrap();
bearer.set_nodelay(true)?;
Ok((Bearer::Tcp(bearer), remote_addr))
}
#[cfg(target_family = "unix")]
pub fn connect_unix<P: AsRef<std::path::Path>>(path: P) -> Result<Self, std::io::Error> {
let bearer = UnixStream::connect(path)?;
Ok(Bearer::Unix(bearer))
}
pub fn read_segment(&mut self) -> Result<Option<Segment>, std::io::Error> {
match self {
Bearer::Tcp(s) => {
// std tcp streams won't be supporting timeout / async. We don't handle
// specific timeout-related errors, these will remain unhandled and bubble up
// to the consumer lib. The Option wrapper is here just for compatiblity with
// other future bearers that might support timeouts
read_segment(s).map(Some)
}
#[cfg(target_family = "unix")]
Bearer::Unix(s) => read_segment(s).map(Some),
}
}
pub fn write_segment(&mut self, segment: Segment) -> Result<(), std::io::Error> {
match self {
Bearer::Tcp(s) => write_segment(s, segment),
#[cfg(target_family = "unix")]
Bearer::Unix(s) => write_segment(s, segment),
}
}
}
impl From<TcpStream> for Bearer {
fn from(stream: TcpStream) -> Self {
Bearer::Tcp(stream)
}
}
#[cfg(target_family = "unix")]
impl From<UnixStream> for Bearer {
fn from(stream: UnixStream) -> Self {
Bearer::Unix(stream)
}
}
impl Clone for Bearer {
fn clone(&self) -> Self {
match self {
Bearer::Tcp(s) => Bearer::Tcp(s.try_clone().expect("error cloning tcp stream")),
#[cfg(target_family = "unix")]
Bearer::Unix(s) => Bearer::Unix(s.try_clone().expect("error cloning unix stream")),
}
}
}

View file

@ -1,67 +0,0 @@
use std::collections::HashMap;
use crate::{bearers::Bearer, Payload};
pub struct EgressError(pub Payload);
pub trait Egress {
fn send(&mut self, payload: Payload) -> Result<(), EgressError>;
}
pub enum DemuxError {
BearerError(std::io::Error),
EgressDisconnected(u16, Payload),
EgressUnknown(u16, Payload),
}
pub enum TickOutcome {
Busy,
Idle,
}
/// A demuxer that reads from a bearer into the corresponding egress
pub struct Demuxer<E> {
bearer: Bearer,
egress: HashMap<u16, E>,
}
impl<E> Demuxer<E>
where
E: Egress,
{
pub fn new(bearer: Bearer) -> Self {
Demuxer {
bearer,
egress: Default::default(),
}
}
pub fn register(&mut self, id: u16, tx: E) {
self.egress.insert(id, tx);
}
pub fn unregister(&mut self, id: u16) -> Option<E> {
self.egress.remove(&id)
}
fn dispatch(&mut self, protocol: u16, payload: Payload) -> Result<(), DemuxError> {
match self.egress.get_mut(&protocol) {
Some(tx) => match tx.send(payload) {
Err(EgressError(p)) => Err(DemuxError::EgressDisconnected(protocol, p)),
Ok(_) => Ok(()),
},
None => Err(DemuxError::EgressUnknown(protocol, payload)),
}
}
pub fn tick(&mut self) -> Result<TickOutcome, DemuxError> {
match self.bearer.read_segment() {
Err(err) => Err(DemuxError::BearerError(err)),
Ok(None) => Ok(TickOutcome::Idle),
Ok(Some(segment)) => match self.dispatch(segment.protocol, segment.payload) {
Err(err) => Err(err),
Ok(()) => Ok(TickOutcome::Busy),
},
}
}
}

View file

@ -1,19 +0,0 @@
#![feature(async_fn_in_trait)]
pub mod agents;
pub mod bearers;
pub mod demux;
pub mod mux;
#[cfg(feature = "std")]
mod std;
#[cfg(feature = "sync")]
pub mod sync;
#[cfg(feature = "std")]
pub use crate::std::*;
pub type Payload = Vec<u8>;
pub type Message = (u16, Payload);

View file

@ -1,60 +0,0 @@
use std::time::{Duration, Instant};
use crate::{
bearers::{Bearer, Segment},
Message,
};
pub enum IngressError {
Disconnected,
Empty,
}
/// Source of payloads for a particular protocol
///
/// To be implemented by any mechanism that allows to submit a payloads from a
/// particular protocol that need to be muxed by the multiplexer.
pub trait Ingress {
fn recv_timeout(&mut self, duration: Duration) -> Result<Message, IngressError>;
}
pub enum TickOutcome {
BearerError(std::io::Error),
IngressDisconnected,
Idle,
Busy,
}
pub struct Muxer<I> {
bearer: Bearer,
ingress: I,
clock: Instant,
}
impl<I> Muxer<I>
where
I: Ingress,
{
pub fn new(bearer: Bearer, ingress: I) -> Self {
Self {
bearer,
ingress,
clock: Instant::now(),
}
}
pub fn tick(&mut self) -> TickOutcome {
match self.ingress.recv_timeout(Duration::from_millis(1)) {
Ok((id, payload)) => {
let segment = Segment::new(self.clock, id, payload);
match self.bearer.write_segment(segment) {
Err(err) => TickOutcome::BearerError(err),
_ => TickOutcome::Busy,
}
}
Err(IngressError::Empty) => TickOutcome::Idle,
Err(IngressError::Disconnected) => TickOutcome::IngressDisconnected,
}
}
}

View file

@ -1,183 +0,0 @@
use crate::{
agents::{self, ChannelBuffer},
bearers::Bearer,
demux, mux, Message, Payload,
};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver, RecvTimeoutError, SendError, Sender},
Arc,
},
thread::{spawn, JoinHandle},
time::Duration,
};
pub type StdIngress = Receiver<Message>;
impl mux::Ingress for StdIngress {
fn recv_timeout(&mut self, duration: Duration) -> Result<Message, mux::IngressError> {
match Receiver::recv_timeout(self, duration) {
Ok(x) => Ok(x),
Err(RecvTimeoutError::Disconnected) => Err(mux::IngressError::Disconnected),
Err(RecvTimeoutError::Timeout) => Err(mux::IngressError::Empty),
}
}
}
pub type StdEgress = Sender<Payload>;
impl demux::Egress for StdEgress {
fn send(&mut self, payload: Payload) -> Result<(), demux::EgressError> {
match Sender::send(self, payload) {
Ok(_) => Ok(()),
Err(SendError(p)) => Err(demux::EgressError(p)),
}
}
}
pub struct StdPlexer {
pub muxer: mux::Muxer<StdIngress>,
pub demuxer: demux::Demuxer<StdEgress>,
pub mux_tx: Sender<Message>,
}
const PROTOCOL_SERVER_BIT: u16 = 0x8000;
impl StdPlexer {
pub fn new(bearer: Bearer) -> Self {
let (mux_tx, mux_rx) = channel::<Message>();
Self {
muxer: mux::Muxer::new(bearer.clone(), mux_rx),
demuxer: demux::Demuxer::new(bearer),
mux_tx,
}
}
pub fn use_channel(&mut self, protocol: u16) -> StdChannel {
let (demux_tx, demux_rx) = channel::<Payload>();
self.demuxer.register(protocol, demux_tx);
let mux_tx = self.mux_tx.clone();
(protocol, mux_tx, demux_rx)
}
/// Use the client-side channel for a given protocol
/// Explicitly unsets the most significant bit, forcing use of the client
/// side channel
pub fn use_client_channel(&mut self, protocol: u16) -> StdChannel {
self.use_channel(protocol & !PROTOCOL_SERVER_BIT)
}
/// Use the server-side channel for a given protocol
/// Explicitly sets the most significant bit, forcing use of the server side
/// channel
pub fn use_server_channel(&mut self, protocol: u16) -> StdChannel {
self.use_channel(protocol | PROTOCOL_SERVER_BIT)
}
}
impl mux::Muxer<StdIngress> {
pub fn block(&mut self, cancel: Cancel) -> Result<(), std::io::Error> {
loop {
match self.tick() {
mux::TickOutcome::BearerError(err) => return Err(err),
mux::TickOutcome::Idle => match cancel.is_set() {
true => break Ok(()),
false => (),
},
mux::TickOutcome::Busy => (),
mux::TickOutcome::IngressDisconnected => break Ok(()),
}
}
}
pub fn spawn(mut self) -> Loop {
let cancel = Cancel::default();
let cancel2 = cancel.clone();
let thread = spawn(move || self.block(cancel2));
Loop { cancel, thread }
}
}
impl demux::Demuxer<StdEgress> {
pub fn block(&mut self, cancel: Cancel) -> Result<(), std::io::Error> {
loop {
match self.tick() {
Ok(demux::TickOutcome::Busy) => (),
Ok(demux::TickOutcome::Idle) => match cancel.is_set() {
true => break Ok(()),
false => (),
},
Err(demux::DemuxError::BearerError(err)) => return Err(err),
Err(demux::DemuxError::EgressDisconnected(id, _)) => {
log::warn!("disconnected protocol {}", id)
}
Err(demux::DemuxError::EgressUnknown(id, _)) => {
log::warn!("unknown protocol {}", id)
}
}
}
}
pub fn spawn(mut self) -> Loop {
let cancel = Cancel::default();
let cancel2 = cancel.clone();
let thread = spawn(move || self.block(cancel2));
Loop { cancel, thread }
}
}
pub type StdChannel = (u16, Sender<Message>, Receiver<Payload>);
pub type StdChannelBuffer = ChannelBuffer<StdChannel>;
impl agents::Channel for StdChannel {
async fn enqueue_chunk(&mut self, payload: Payload) -> Result<(), agents::ChannelError> {
match self.1.send((self.0, payload)) {
Ok(_) => Ok(()),
Err(SendError((_, payload))) => Err(agents::ChannelError::NotConnected(Some(payload))),
}
}
async fn dequeue_chunk(&mut self) -> Result<Payload, agents::ChannelError> {
match self.2.recv() {
Ok(payload) => Ok(payload),
Err(_) => Err(agents::ChannelError::NotConnected(None)),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct Cancel(Arc<AtomicBool>);
impl Cancel {
pub fn set(&self) {
self.0.store(true, Ordering::SeqCst);
}
pub fn is_set(&self) -> bool {
self.0.load(Ordering::SeqCst)
}
}
#[derive(Debug)]
pub struct Loop {
cancel: Cancel,
thread: JoinHandle<Result<(), std::io::Error>>,
}
impl Loop {
pub fn cancel(&self) {
self.cancel.set();
}
pub fn join(self) -> Result<(), std::io::Error> {
self.thread.join().unwrap()
}
}

View file

@ -1,55 +0,0 @@
use crate::{
agents::{self, ChannelBuffer},
bearers::{Bearer, Segment},
Payload,
};
use std::time::Instant;
pub struct SyncPlexer {
bearer: Bearer,
protocol: u16,
clock: Instant,
}
impl SyncPlexer {
pub fn new(bearer: Bearer, protocol: u16) -> Self {
Self {
bearer,
protocol,
clock: Instant::now(),
}
}
pub fn unwrap(self) -> Bearer {
self.bearer
}
}
pub type SyncChannel = ChannelBuffer<SyncPlexer>;
impl agents::Channel for SyncPlexer {
async fn enqueue_chunk(&mut self, payload: Payload) -> Result<(), agents::ChannelError> {
let segment = Segment::new(self.clock, self.protocol, payload);
self.bearer
.write_segment(segment)
.map_err(|_| agents::ChannelError::NotConnected(None))
}
async fn dequeue_chunk(&mut self) -> Result<Payload, agents::ChannelError> {
match self.bearer.read_segment() {
Ok(segment) => match segment {
Some(x) => {
assert_eq!(
x.protocol, self.protocol,
"sync plexer received payload for wrong protocol"
);
Ok(x.payload)
}
None => Err(agents::ChannelError::NotConnected(None)),
},
Err(_) => Err(agents::ChannelError::NotConnected(None)),
}
}
}

View file

@ -1,59 +0,0 @@
use std::{
net::{Ipv4Addr, SocketAddrV4, TcpListener},
thread::{self, JoinHandle},
};
use log::info;
use pallas_multiplexer::{agents::Channel, bearers::Bearer, StdPlexer};
use rand::{distributions::Uniform, Rng};
fn setup_passive_muxer<const P: u16>() -> JoinHandle<StdPlexer> {
thread::spawn(|| {
let server = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap();
info!("listening for connections on port {}", P);
let (bearer, _) = Bearer::accept_tcp(server).unwrap();
StdPlexer::new(bearer)
})
}
fn setup_active_muxer<const P: u16>() -> JoinHandle<StdPlexer> {
thread::spawn(|| {
let bearer = Bearer::connect_tcp(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap();
StdPlexer::new(bearer)
})
}
fn random_payload(size: usize) -> Vec<u8> {
let range = Uniform::from(0..255);
rand::thread_rng().sample_iter(&range).take(size).collect()
}
#[tokio::test]
async fn one_way_small_sequence_of_payloads() {
let passive = setup_passive_muxer::<50301>();
// HACK: a small sleep seems to be required for Github actions runner to
// formally expose the port
thread::sleep(std::time::Duration::from_secs(1));
let active = setup_active_muxer::<50301>();
let mut active_plexer = active.join().unwrap();
let mut passive_plexer = passive.join().unwrap();
let mut sender_channel = active_plexer.use_client_channel(0x0003u16);
let mut receiver_channel = passive_plexer.use_server_channel(0x0003u16);
active_plexer.muxer.spawn();
passive_plexer.demuxer.spawn();
for _ in 0..100 {
let payload = random_payload(50);
sender_channel.enqueue_chunk(payload.clone()).await.unwrap();
let received_payload = receiver_channel.dequeue_chunk().await.unwrap();
assert_eq!(payload, received_payload);
}
}

29
pallas-network/Cargo.toml Normal file
View file

@ -0,0 +1,29 @@
[package]
name = "pallas-network"
description = "Ouroboros networking stack using async IO"
version = "0.18.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
homepage = "https://github.com/txpipe/pallas"
documentation = "https://docs.rs/pallas-upstream"
license = "Apache-2.0"
readme = "README.md"
authors = [
"Santiago Carmuega <santiago@carmuega.me>",
"Pi Lanningham <pi.lanningham@gmail.com>",
]
[dependencies]
byteorder = "1.4.3"
hex = "0.4.3"
itertools = "0.10.5"
pallas-codec = { version = "0.18.0", path = "../pallas-codec" }
pallas-crypto = { version = "0.18.0", path = "../pallas-crypto" }
thiserror = "1.0.31"
tokio = { version = "1", features = ["net", "io-util", "time", "sync"] }
tracing = "0.1.37"
[dev-dependencies]
tracing-subscriber = "0.3.16"
tokio = { version = "1", features = ["full"] }
rand = "0.8.5"

3
pallas-network/README.md Normal file
View file

@ -0,0 +1,3 @@
# Pallas Network
An implementation of the Ouroboros networking stack. It provides a generic multiplexer and state-machines for the different mini-protocols. It uses async and tokio under the hood.

View file

@ -0,0 +1,201 @@
use std::net::SocketAddr;
use std::path::Path;
use byteorder::{ByteOrder, NetworkEndian};
use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs, UnixStream};
use tokio::time::Instant;
use tracing::trace;
const HEADER_LEN: usize = 8;
pub type Timestamp = u32;
pub type Payload = Vec<u8>;
pub type Protocol = u16;
#[derive(Debug)]
pub struct Header {
pub protocol: Protocol,
pub timestamp: Timestamp,
pub payload_len: u16,
}
impl From<&[u8]> for Header {
fn from(value: &[u8]) -> Self {
let timestamp = NetworkEndian::read_u32(&value[0..4]);
let protocol = NetworkEndian::read_u16(&value[4..6]);
let payload_len = NetworkEndian::read_u16(&value[6..8]);
Self {
timestamp,
protocol,
payload_len,
}
}
}
impl From<Header> for [u8; 8] {
fn from(value: Header) -> Self {
let mut out = [0u8; 8];
NetworkEndian::write_u32(&mut out[0..4], value.timestamp);
NetworkEndian::write_u16(&mut out[4..6], value.protocol);
NetworkEndian::write_u16(&mut out[6..8], value.payload_len);
out
}
}
pub struct Segment {
pub header: Header,
pub payload: Payload,
}
pub enum Bearer {
Tcp(TcpStream),
Unix(UnixStream),
}
const BUFFER_LEN: usize = 1024 * 10;
impl Bearer {
pub async fn connect_tcp(addr: impl ToSocketAddrs) -> Result<Self, tokio::io::Error> {
let stream = TcpStream::connect(addr).await?;
Ok(Self::Tcp(stream))
}
pub async fn accept_tcp(listener: TcpListener) -> tokio::io::Result<(Self, SocketAddr)> {
let (stream, addr) = listener.accept().await?;
Ok((Self::Tcp(stream), addr))
}
pub async fn connect_unix(path: impl AsRef<Path>) -> Result<Self, tokio::io::Error> {
let stream = UnixStream::connect(path).await?;
Ok(Self::Unix(stream))
}
pub async fn readable(&self) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.readable().await,
Bearer::Unix(x) => x.readable().await,
}
}
fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result<usize> {
match self {
Bearer::Tcp(x) => x.try_read(buf),
Bearer::Unix(x) => x.try_read(buf),
}
}
async fn write_all(&mut self, buf: &[u8]) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.write_all(buf).await,
Bearer::Unix(x) => x.write_all(buf).await,
}
}
async fn flush(&mut self) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.flush().await,
Bearer::Unix(x) => x.flush().await,
}
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("no data available in bearer to complete segment")]
NoData,
#[error("unexpected I/O error")]
Io(#[source] tokio::io::Error),
}
pub struct SegmentBuffer(Bearer, Vec<u8>);
impl SegmentBuffer {
pub fn new(bearer: Bearer) -> Self {
Self(bearer, Vec::with_capacity(BUFFER_LEN))
}
/// Cancel-safe loop that reads from bearer until certain len
async fn cancellable_read(&mut self, required: usize) -> Result<(), Error> {
loop {
self.0.readable().await.map_err(Error::Io)?;
trace!("bearer is readable");
let remaining = required - self.1.len();
let mut buf = vec![0u8; remaining];
match self.0.try_read(&mut buf) {
Ok(0) => break Err(Error::NoData),
Ok(n) => {
trace!(n, "found data on bearer");
self.1.extend_from_slice(&buf[0..n]);
if self.1.len() >= required {
break Ok(());
}
}
Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => {
trace!("reading from bearer would block");
continue;
}
Err(e) => {
return Err(Error::Io(e));
}
}
}
}
/// Peek the available data in search for a frame header
async fn peek_header(&mut self) -> Result<Header, Error> {
trace!("waiting for header buf");
self.cancellable_read(HEADER_LEN).await?;
trace!("found enough data for header");
let header = &self.1[..HEADER_LEN];
Ok(Header::from(header))
}
// Cancel-safe read of a full segment from the bearer
pub async fn read_segment(&mut self) -> Result<(Protocol, Payload), Error> {
let header = self.peek_header().await?;
trace!("waiting for full segment buf");
let segment_size = HEADER_LEN + header.payload_len as usize;
self.cancellable_read(segment_size).await?;
trace!("draining segment buffer");
let segment = self.1.drain(..segment_size);
let payload = segment.skip(HEADER_LEN).collect();
Ok((header.protocol, payload))
}
pub async fn write_segment(
&mut self,
protocol: u16,
clock: &Instant,
payload: &[u8],
) -> Result<(), std::io::Error> {
let header = Header {
protocol,
timestamp: clock.elapsed().as_micros() as u32,
payload_len: payload.len() as u16,
};
let buf: [u8; 8] = header.into();
self.0.write_all(&buf).await?;
self.0.write_all(payload).await?;
self.0.flush().await?;
Ok(())
}
}

View file

@ -0,0 +1,139 @@
use std::path::Path;
use thiserror::Error;
use tokio::task::JoinHandle;
use tracing::{debug, error};
use crate::{
bearer,
miniprotocols::{
blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC,
PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY,
},
plexer,
};
#[derive(Debug, Error)]
pub enum Error {
#[error("error connecting bearer")]
ConnectFailure(#[source] tokio::io::Error),
#[error("handshake protocol error")]
HandshakeProtocol(handshake::Error),
#[error("handshake version not accepted")]
IncompatibleVersion,
}
pub struct PeerClient {
plexer_handle: JoinHandle<tokio::io::Result<()>>,
pub handshake: handshake::Confirmation<handshake::n2n::VersionData>,
chainsync: chainsync::N2NClient,
blockfetch: blockfetch::Client,
}
impl PeerClient {
pub async fn connect(address: &str, magic: u64) -> Result<Self, Error> {
debug!("connecting");
let bearer = bearer::Bearer::connect_tcp(address)
.await
.map_err(Error::ConnectFailure)?;
let mut plexer = plexer::Plexer::new(bearer);
let channel0 = plexer.subscribe_client(0);
let channel2 = plexer.subscribe_client(2);
let channel3 = plexer.subscribe_client(3);
let plexer_handle = tokio::spawn(async move { plexer.run().await });
let versions = handshake::n2n::VersionTable::v7_and_above(magic);
let mut client = handshake::Client::new(channel0);
let handshake = client
.handshake(versions)
.await
.map_err(Error::HandshakeProtocol)?;
if let handshake::Confirmation::Rejected(reason) = handshake {
error!(?reason, "handshake refused");
return Err(Error::IncompatibleVersion);
}
Ok(Self {
plexer_handle,
handshake,
chainsync: chainsync::Client::new(channel2),
blockfetch: blockfetch::Client::new(channel3),
})
}
pub fn chainsync(&mut self) -> &mut chainsync::N2NClient {
&mut self.chainsync
}
pub fn blockfetch(&mut self) -> &mut blockfetch::Client {
&mut self.blockfetch
}
pub fn abort(&mut self) {
self.plexer_handle.abort();
}
}
pub struct NodeClient {
plexer_handle: JoinHandle<tokio::io::Result<()>>,
pub handshake: handshake::Confirmation<handshake::n2c::VersionData>,
chainsync: chainsync::N2CClient,
statequery: localstate::ClientV10,
}
impl NodeClient {
pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
debug!("connecting");
let bearer = bearer::Bearer::connect_unix(path)
.await
.map_err(Error::ConnectFailure)?;
let mut plexer = plexer::Plexer::new(bearer);
let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
let cs_channel = plexer.subscribe_client(PROTOCOL_N2C_CHAIN_SYNC);
let sq_channel = plexer.subscribe_client(PROTOCOL_N2C_STATE_QUERY);
let plexer_handle = tokio::spawn(async move { plexer.run().await });
let versions = handshake::n2c::VersionTable::v10_and_above(magic);
let mut client = handshake::Client::new(hs_channel);
let handshake = client
.handshake(versions)
.await
.map_err(Error::HandshakeProtocol)?;
if let handshake::Confirmation::Rejected(reason) = handshake {
error!(?reason, "handshake refused");
return Err(Error::IncompatibleVersion);
}
Ok(Self {
plexer_handle,
handshake,
chainsync: chainsync::Client::new(cs_channel),
statequery: localstate::Client::new(sq_channel),
})
}
pub fn chainsync(&mut self) -> &mut chainsync::N2CClient {
&mut self.chainsync
}
pub fn statequery(&mut self) -> &mut localstate::ClientV10 {
&mut self.statequery
}
pub fn abort(&mut self) {
self.plexer_handle.abort();
}
}

View file

@ -0,0 +1,4 @@
pub mod bearer;
pub mod facades;
pub mod miniprotocols;
pub mod plexer;

View file

@ -1,9 +1,8 @@
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError};
use thiserror::Error;
use tracing::{debug, info, warn};
use crate::common::Point;
use crate::miniprotocols::common::Point;
use crate::plexer;
use super::{Message, State};
@ -24,8 +23,8 @@ pub enum Error {
#[error("requested range doesn't contain any blocks")]
NoBlocks,
#[error("error while sending or receiving data through the channel")]
ChannelError(ChannelError),
#[error("error while sending or receiving data through the multiplexer")]
Plexer(plexer::Error),
}
pub type Body = Vec<u8>;
@ -34,18 +33,11 @@ pub type Range = (Point, Point);
pub type HasBlocks = Option<()>;
pub struct Client<H>(State, ChannelBuffer<H>)
where
H: Channel,
Message: Fragment;
pub struct Client(State, plexer::ChannelBuffer);
impl<H> Client<H>
where
H: Channel,
Message: Fragment,
{
pub fn new(channel: H) -> Self {
Self(State::Idle, ChannelBuffer::new(channel))
impl Client {
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(State::Idle, plexer::ChannelBuffer::new(channel))
}
pub fn state(&self) -> &State {
@ -102,17 +94,14 @@ where
pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
@ -149,6 +138,8 @@ where
}
pub async fn recv_while_streaming(&mut self) -> Result<Option<Body>, Error> {
debug!("waiting for stream");
match self.recv_message().await? {
Message::Block { body } => Ok(Some(body)),
Message::BatchDone => {

View file

@ -1,4 +1,4 @@
use crate::Point;
use crate::miniprotocols::Point;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {

View file

@ -1,6 +1,6 @@
use std::collections::{vec_deque::Iter, VecDeque};
use crate::Point;
use crate::miniprotocols::Point;
/// A memory buffer to handle chain rollbacks
///
@ -98,7 +98,8 @@ impl RollbackBuffer {
#[cfg(test)]
mod tests {
use crate::{chainsync::RollbackEffect, Point};
use super::RollbackEffect;
use crate::miniprotocols::Point;
use super::RollbackBuffer;

View file

@ -1,10 +1,10 @@
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError};
use std::marker::PhantomData;
use thiserror::Error;
use tracing::debug;
use crate::common::Point;
use crate::miniprotocols::Point;
use crate::plexer;
use super::{BlockContent, HeaderContent, Message, State, Tip};
@ -26,7 +26,7 @@ pub enum Error {
IntersectionNotFound,
#[error("error while sending or receiving data through the channel")]
ChannelError(ChannelError),
Plexer(plexer::Error),
}
pub type IntersectResponse = (Option<Point>, Tip);
@ -38,18 +38,20 @@ pub enum NextResponse<CONTENT> {
Await,
}
pub struct Client<H, O>(State, ChannelBuffer<H>, PhantomData<O>)
pub struct Client<O>(State, plexer::ChannelBuffer, PhantomData<O>)
where
H: Channel,
Message<O>: Fragment;
impl<H, O> Client<H, O>
impl<O> Client<O>
where
H: Channel,
Message<O>: Fragment,
{
pub fn new(channel: H) -> Self {
Self(State::Idle, ChannelBuffer::new(channel), PhantomData {})
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(
State::Idle,
plexer::ChannelBuffer::new(channel),
PhantomData {},
)
}
pub fn state(&self) -> &State {
@ -112,10 +114,7 @@ where
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
Ok(())
}
@ -123,7 +122,7 @@ where
pub async fn recv_message(&mut self) -> Result<Message<O>, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
@ -135,10 +134,14 @@ where
self.send_message(&msg).await?;
self.0 = State::Intersect;
debug!("send find intersect");
Ok(())
}
pub async fn recv_intersect_response(&mut self) -> Result<IntersectResponse, Error> {
debug!("waiting for intersect response");
match self.recv_message().await? {
Message::IntersectFound(point, tip) => {
self.0 = State::Idle;
@ -232,6 +235,6 @@ where
}
}
pub type N2NClient<H> = Client<H, HeaderContent>;
pub type N2NClient = Client<HeaderContent>;
pub type N2CClient<H> = Client<H, BlockContent>;
pub type N2CClient = Client<BlockContent>;

View file

@ -1,6 +1,6 @@
use std::{fmt::Debug, ops::Deref};
use crate::common::Point;
use crate::miniprotocols::Point;
#[derive(Debug, Clone)]
pub struct Tip(pub Point, pub u64);

View file

@ -17,7 +17,7 @@ pub const PRE_PRODUCTION_MAGIC: u64 = 1;
/// Bitflag for client-side version of a known protocol
/// # Example
/// ```
/// use pallas_miniprotocols::*;
/// use pallas_network::miniprotocols::*;
/// let channel = PROTOCOL_CLIENT | PROTOCOL_N2N_HANDSHAKE;
/// ```
pub const PROTOCOL_CLIENT: u16 = 0x0;
@ -25,7 +25,7 @@ pub const PROTOCOL_CLIENT: u16 = 0x0;
/// Bitflag for server-side version of a known protocol
/// # Example
/// ```
/// use pallas_miniprotocols::*;
/// use pallas_network::miniprotocols::*;
/// let channel = PROTOCOL_SERVER | PROTOCOL_N2N_CHAIN_SYNC;
/// ```
pub const PROTOCOL_SERVER: u16 = 0x8000;

View file

@ -1,9 +1,9 @@
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer};
use std::marker::PhantomData;
use tracing::debug;
use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable};
use crate::plexer;
#[derive(Debug)]
pub enum Confirmation<D> {
@ -11,18 +11,19 @@ pub enum Confirmation<D> {
Rejected(RefuseReason),
}
pub struct Client<H, D>(State, ChannelBuffer<H>, PhantomData<D>)
where
H: Channel;
pub struct Client<D>(State, plexer::ChannelBuffer, PhantomData<D>);
impl<H, D> Client<H, D>
impl<D> Client<D>
where
H: Channel,
D: std::fmt::Debug + Clone,
Message<D>: Fragment,
{
pub fn new(channel: H) -> Self {
Self(State::Propose, ChannelBuffer::new(channel), PhantomData {})
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(
State::Propose,
plexer::ChannelBuffer::new(channel),
PhantomData {},
)
}
pub fn state(&self) -> &State {
@ -75,17 +76,14 @@ where
pub async fn send_message(&mut self, msg: &Message<D>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message<D>, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
@ -124,11 +122,11 @@ where
self.recv_while_confirm().await
}
pub fn unwrap(self) -> H {
pub fn unwrap(self) -> plexer::AgentChannel {
self.1.unwrap()
}
}
pub type N2NClient<H> = Client<H, super::n2n::VersionData>;
pub type N2NClient = Client<super::n2n::VersionData>;
pub type N2CClient<H> = Client<H, super::n2c::VersionData>;
pub type N2CClient = Client<super::n2c::VersionData>;

View file

@ -1,9 +1,10 @@
use itertools::Itertools;
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use pallas_multiplexer::agents::ChannelError;
use std::{collections::HashMap, fmt::Debug};
use thiserror::*;
use crate::plexer;
#[derive(Error, Debug)]
pub enum Error {
#[error("attempted to receive message while agency is ours")]
@ -19,7 +20,7 @@ pub enum Error {
InvalidOutbound,
#[error("error while sending or receiving data through the channel")]
ChannelError(ChannelError),
Plexer(plexer::Error),
}
#[derive(Debug, Clone)]

View file

@ -1,22 +1,23 @@
use std::marker::PhantomData;
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer};
use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable};
use crate::plexer;
pub struct Server<H, D>(State, ChannelBuffer<H>, PhantomData<D>)
where
H: Channel;
pub struct Server<D>(State, plexer::ChannelBuffer, PhantomData<D>);
impl<H, D> Server<H, D>
impl<D> Server<D>
where
H: Channel,
D: std::fmt::Debug + Clone,
Message<D>: Fragment,
{
pub fn new(channel: H) -> Self {
Self(State::Propose, ChannelBuffer::new(channel), PhantomData {})
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(
State::Propose,
plexer::ChannelBuffer::new(channel),
PhantomData {},
)
}
pub fn state(&self) -> &State {
@ -65,17 +66,14 @@ where
pub async fn send_message(&mut self, msg: &Message<D>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message<D>, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
@ -111,11 +109,11 @@ where
Ok(())
}
pub fn unwrap(self) -> H {
pub fn unwrap(self) -> plexer::AgentChannel {
self.1.unwrap()
}
}
pub type N2NServer<H> = Server<H, super::n2n::VersionData>;
pub type N2NServer = Server<super::n2n::VersionData>;
pub type N2CServer<H> = Server<H, super::n2c::VersionData>;
pub type N2CServer = Server<super::n2c::VersionData>;

View file

@ -2,13 +2,12 @@ use std::fmt::Debug;
use pallas_codec::Fragment;
use crate::common::Point;
use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError};
use std::marker::PhantomData;
use thiserror::*;
use super::{AcquireFailure, Message, Query, State};
use crate::miniprotocols::Point;
use crate::plexer;
#[derive(Error, Debug)]
pub enum Error {
@ -25,7 +24,7 @@ pub enum Error {
#[error("failure acquiring point, too old")]
AcquirePointTooOld,
#[error("error while sending or receiving data through the channel")]
ChannelError(ChannelError),
Plexer(plexer::Error),
}
impl From<AcquireFailure> for Error {
@ -37,20 +36,22 @@ impl From<AcquireFailure> for Error {
}
}
pub struct Client<H, Q>(State, ChannelBuffer<H>, PhantomData<Q>)
pub struct Client<Q>(State, plexer::ChannelBuffer, PhantomData<Q>)
where
H: Channel,
Q: Query,
Message<Q>: Fragment;
impl<H, Q> Client<H, Q>
impl<Q> Client<Q>
where
H: Channel,
Q: Query,
Message<Q>: Fragment,
{
pub fn new(channel: H) -> Self {
Self(State::Idle, ChannelBuffer::new(channel), PhantomData {})
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(
State::Idle,
plexer::ChannelBuffer::new(channel),
PhantomData {},
)
}
pub fn state(&self) -> &State {
@ -108,17 +109,14 @@ where
pub async fn send_message(&mut self, msg: &Message<Q>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message<Q>, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
@ -175,4 +173,4 @@ where
}
}
pub type ClientV10<H> = Client<H, super::queries::QueryV10>;
pub type ClientV10 = Client<super::queries::QueryV10>;

View file

@ -1,6 +1,6 @@
use std::fmt::Debug;
use crate::common::Point;
use crate::miniprotocols::Point;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {

View file

@ -1,5 +1,4 @@
mod common;
mod machines;
pub mod blockfetch;
pub mod chainsync;
@ -9,4 +8,3 @@ pub mod txmonitor;
pub mod txsubmission;
pub use common::*;
pub use machines::*;

View file

@ -0,0 +1,205 @@
use std::fmt::Debug;
use thiserror::*;
use super::protocol::*;
use crate::plexer;
#[derive(Error, Debug)]
pub enum Error {
#[error("attempted to receive message while agency is ours")]
AgencyIsOurs,
#[error("attempted to send message while agency is theirs")]
AgencyIsTheirs,
#[error("inbound message is not valid for current state")]
InvalidInbound,
#[error("outbound message is not valid for current state")]
InvalidOutbound,
#[error("error while sending or receiving data through the channel")]
Plexer(plexer::Error),
}
pub struct Client(State, plexer::ChannelBuffer);
impl Client {
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(State::Idle, plexer::ChannelBuffer::new(channel))
}
pub fn state(&self) -> &State {
&self.0
}
pub fn is_done(&self) -> bool {
self.0 == State::Done
}
fn has_agency(&self) -> bool {
match &self.0 {
State::Idle => true,
State::Acquiring => false,
State::Acquired => true,
State::Busy => false,
State::Done => false,
}
}
fn assert_agency_is_ours(&self) -> Result<(), Error> {
if !self.has_agency() {
Err(Error::AgencyIsTheirs)
} else {
Ok(())
}
}
fn assert_agency_is_theirs(&self) -> Result<(), Error> {
if self.has_agency() {
Err(Error::AgencyIsOurs)
} else {
Ok(())
}
}
fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Idle, Message::Acquire) => Ok(()),
(State::Idle, Message::Done) => Ok(()),
(State::Acquired, Message::Acquire) => Ok(()),
(State::Acquired, Message::RequestHasTx(..)) => Ok(()),
(State::Acquired, Message::RequestNextTx) => Ok(()),
(State::Acquired, Message::RequestSizeAndCapacity) => Ok(()),
_ => Err(Error::InvalidOutbound),
}
}
fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Acquiring, Message::Acquired(..)) => Ok(()),
(State::Busy, Message::ResponseHasTx(..)) => Ok(()),
(State::Busy, Message::ResponseNextTx(..)) => Ok(()),
(State::Busy, Message::ResponseSizeAndCapacity(..)) => Ok(()),
_ => Err(Error::InvalidInbound),
}
}
pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
}
async fn send_acquire(&mut self) -> Result<(), Error> {
let msg = Message::Acquire;
self.send_message(&msg).await?;
self.0 = State::Acquiring;
Ok(())
}
async fn recv_while_acquiring(&mut self) -> Result<Slot, Error> {
match self.recv_message().await? {
Message::Acquired(slot) => {
self.0 = State::Acquired;
Ok(slot)
}
_ => Err(Error::InvalidInbound),
}
}
pub async fn acquire(&mut self) -> Result<Slot, Error> {
self.send_acquire().await?;
self.recv_while_acquiring().await
}
async fn send_request_has_tx(&mut self, id: TxId) -> Result<(), Error> {
let msg = Message::RequestHasTx(id);
self.send_message(&msg).await?;
self.0 = State::Busy;
Ok(())
}
async fn recv_while_requesting_has_tx(&mut self) -> Result<bool, Error> {
match self.recv_message().await? {
Message::ResponseHasTx(x) => {
self.0 = State::Acquired;
Ok(x)
}
_ => Err(Error::InvalidInbound),
}
}
pub async fn query_has_tx(&mut self, id: TxId) -> Result<bool, Error> {
self.send_request_has_tx(id).await?;
self.recv_while_requesting_has_tx().await
}
async fn send_request_next_tx(&mut self) -> Result<(), Error> {
let msg = Message::RequestNextTx;
self.send_message(&msg).await?;
self.0 = State::Busy;
Ok(())
}
async fn recv_while_requesting_next_tx(&mut self) -> Result<Option<Tx>, Error> {
match self.recv_message().await? {
Message::ResponseNextTx(x) => {
self.0 = State::Acquired;
Ok(x)
}
_ => Err(Error::InvalidInbound),
}
}
pub async fn query_next_tx(&mut self) -> Result<Option<Tx>, Error> {
self.send_request_next_tx().await?;
self.recv_while_requesting_next_tx().await
}
async fn send_request_size_and_capacity(&mut self) -> Result<(), Error> {
let msg = Message::RequestSizeAndCapacity;
self.send_message(&msg).await?;
self.0 = State::Busy;
Ok(())
}
async fn recv_while_requesting_size_and_capacity(
&mut self,
) -> Result<MempoolSizeAndCapacity, Error> {
match self.recv_message().await? {
Message::ResponseSizeAndCapacity(x) => {
self.0 = State::Acquired;
Ok(x)
}
_ => Err(Error::InvalidInbound),
}
}
pub async fn query_size_and_capacity(&mut self) -> Result<MempoolSizeAndCapacity, Error> {
self.send_request_size_and_capacity().await?;
self.recv_while_requesting_size_and_capacity().await
}
pub async fn release(&mut self) -> Result<(), Error> {
let msg = Message::Release;
self.send_message(&msg).await?;
self.0 = State::Idle;
Ok(())
}
}

View file

@ -0,0 +1,114 @@
use super::protocol::*;
use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder};
impl Encode<()> for Message {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
Message::Done => {
e.array(1)?.u16(0)?;
}
Message::Acquire => {
e.array(1)?.u16(1)?;
}
Message::Acquired(slot) => {
e.array(2)?.u16(2)?;
e.encode(slot)?;
}
Message::Release => {
e.array(1)?.u16(3)?;
}
// TODO: confirm if this is valid, I'm just assuming that label 4 is AwaitAcquire, can't
// find the specs
Message::AwaitAcquire => {
e.array(1)?.u16(4)?;
}
Message::RequestNextTx => {
e.array(1)?.u16(5)?;
}
Message::ResponseNextTx(None) => {
e.array(1)?.u16(6)?;
}
Message::ResponseNextTx(Some(tx)) => {
e.array(2)?.u16(6)?;
e.encode(tx)?;
}
Message::RequestHasTx(tx) => {
e.array(2)?.u16(7)?;
e.encode(tx)?;
}
Message::ResponseHasTx(tx) => {
e.array(2)?.u16(8)?;
e.encode(tx)?;
}
Message::RequestSizeAndCapacity => {
e.array(1)?.u16(9)?;
}
Message::ResponseSizeAndCapacity(sz) => {
e.array(2)?.u16(10)?;
e.array(3)?;
e.encode(sz.capacity_in_bytes)?;
e.encode(sz.size_in_bytes)?;
e.encode(sz.number_of_txs)?;
}
}
Ok(())
}
}
impl<'b> Decode<'b, ()> for Message {
fn decode(
d: &mut pallas_codec::minicbor::Decoder<'b>,
_ctx: &mut (),
) -> Result<Self, decode::Error> {
d.array()?;
let label = d.u16()?;
match label {
0 => Ok(Message::Done),
1 => Ok(Message::Acquire),
2 => {
let slot = d.decode()?;
Ok(Message::Acquired(slot))
}
3 => Ok(Message::Release),
// TODO: confirm if this is valid, I'm just assuming that label 4 is AwaitAcquire, can't
// find the specs
4 => Ok(Message::AwaitAcquire),
5 => Ok(Message::RequestNextTx),
6 => match d.array()? {
Some(_) => {
let cbor: pallas_codec::utils::CborWrap<Tx> = d.decode()?;
Ok(Message::ResponseNextTx(Some(cbor.unwrap())))
}
None => Ok(Message::ResponseNextTx(None)),
},
7 => {
let id = d.decode()?;
Ok(Message::RequestHasTx(id))
}
8 => {
let has = d.decode()?;
Ok(Message::ResponseHasTx(has))
}
9 => Ok(Message::RequestSizeAndCapacity),
10 => {
d.array()?;
let capacity_in_bytes = d.decode()?;
let size_in_bytes = d.decode()?;
let number_of_txs = d.decode()?;
Ok(Message::ResponseSizeAndCapacity(MempoolSizeAndCapacity {
capacity_in_bytes,
size_in_bytes,
number_of_txs,
}))
}
_ => Err(decode::Error::message("can't decode Message")),
}
}
}

View file

@ -0,0 +1,7 @@
mod client;
mod codec;
mod protocol;
pub use client::*;
pub use codec::*;
pub use protocol::*;

View file

@ -0,0 +1,34 @@
pub type Slot = u64;
pub type TxId = String;
pub type Tx = Vec<u8>;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
Idle,
Acquiring,
Acquired,
Busy,
Done,
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct MempoolSizeAndCapacity {
pub capacity_in_bytes: u32,
pub size_in_bytes: u32,
pub number_of_txs: u32,
}
#[derive(Debug, Clone)]
pub enum Message {
Acquire,
AwaitAcquire,
Acquired(Slot),
RequestHasTx(TxId),
RequestNextTx,
RequestSizeAndCapacity,
ResponseHasTx(bool),
ResponseNextTx(Option<Tx>),
ResponseSizeAndCapacity(MempoolSizeAndCapacity),
Release,
Done,
}

View file

@ -1,7 +1,7 @@
use std::marker::PhantomData;
use crate::plexer;
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer};
use super::{
protocol::{Error, Message, State, TxIdAndSize},
@ -16,28 +16,26 @@ pub enum Request<TxId> {
/// A generic Ouroboros client for submitting a generic notion of "transactions"
/// to another server
pub struct GenericClient<H, TxId, TxBody>(
pub struct GenericClient<TxId, TxBody>(
State,
ChannelBuffer<H>,
plexer::ChannelBuffer,
PhantomData<TxId>,
PhantomData<TxBody>,
)
where
H: Channel,
Message<TxId, TxBody>: Fragment;
/// A cardano specific instantiation of the ouroboros protocol
pub type Client<H> = GenericClient<H, EraTxId, EraTxBody>;
pub type Client = GenericClient<EraTxId, EraTxBody>;
impl<H, TxId, TxBody> GenericClient<H, TxId, TxBody>
impl<TxId, TxBody> GenericClient<TxId, TxBody>
where
H: Channel,
Message<TxId, TxBody>: Fragment,
{
pub fn new(channel: H) -> Self {
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(
State::Init,
ChannelBuffer::new(channel),
plexer::ChannelBuffer::new(channel),
PhantomData {},
PhantomData {},
)
@ -95,17 +93,14 @@ where
pub async fn send_message(&mut self, msg: &Message<TxId, TxBody>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message<TxId, TxBody>, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
Ok(msg)

View file

@ -1,6 +1,7 @@
use pallas_multiplexer::agents::ChannelError;
use thiserror::Error;
use crate::plexer;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
Init,
@ -46,7 +47,7 @@ pub enum Error {
AlreadyInitialized,
#[error("error while sending or receiving data through the channel")]
ChannelError(ChannelError),
Plexer(plexer::Error),
}
#[derive(Debug)]

View file

@ -1,12 +1,12 @@
use std::marker::PhantomData;
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer};
use super::{
protocol::{Blocking, Error, Message, State, TxCount, TxIdAndSize},
EraTxBody, EraTxId,
};
use crate::plexer;
pub enum Reply<TxId, TxBody> {
TxIds(Vec<TxIdAndSize<TxId>>),
@ -16,28 +16,26 @@ pub enum Reply<TxId, TxBody> {
/// A generic implementation of an ouroboros server protocol ready to request
/// and receive transactions from a client
pub struct GenericServer<H, TxId, TxBody>(
pub struct GenericServer<TxId, TxBody>(
State,
ChannelBuffer<H>,
plexer::ChannelBuffer,
PhantomData<TxId>,
PhantomData<TxBody>,
)
where
H: Channel,
Message<TxId, TxBody>: Fragment;
/// A Cardano specific server for the ouroboros TxSubmission protocol
pub type Server<H> = GenericServer<H, EraTxId, EraTxBody>;
pub type Server = GenericServer<EraTxId, EraTxBody>;
impl<H, TxId, TxBody> GenericServer<H, TxId, TxBody>
impl<TxId, TxBody> GenericServer<TxId, TxBody>
where
H: Channel,
Message<TxId, TxBody>: Fragment,
{
pub fn new(channel: H) -> Self {
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(
State::Init,
ChannelBuffer::new(channel),
plexer::ChannelBuffer::new(channel),
PhantomData {},
PhantomData {},
)
@ -95,17 +93,14 @@ where
pub async fn send_message(&mut self, msg: &Message<TxId, TxBody>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message<TxId, TxBody>, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
Ok(msg)

View file

@ -0,0 +1,317 @@
use pallas_codec::{minicbor, Fragment};
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
use tokio::{select, time::Instant};
use tracing::{debug, error, trace};
use crate::bearer::{Bearer, Payload, Protocol, SegmentBuffer};
#[derive(Error, Debug)]
pub enum Error {
#[error("failure to encode channel message")]
Decoding(String),
#[error("failure to decode channel message")]
Encoding(String),
#[error("agent failed to enqueue chunk for protocol {0}")]
AgentEnqueue(Protocol, Payload),
#[error("agent failed to dequeue chunk")]
AgentDequeue,
#[error("plexer failed to dumux chunk for protocol {0}")]
PlexerDemux(Protocol, Payload),
#[error("plexer failed to mux chunk")]
PlexerMux,
#[error("bearer IO error")]
Bearer(tokio::io::Error),
}
pub struct AgentChannel {
enqueue_protocol: crate::bearer::Protocol,
dequeue_protocol: crate::bearer::Protocol,
to_plexer: tokio::sync::mpsc::Sender<(Protocol, Payload)>,
from_plexer: tokio::sync::broadcast::Receiver<(Protocol, Payload)>,
}
impl AgentChannel {
fn for_client(protocol: crate::bearer::Protocol, ingress: &Ingress, egress: &Egress) -> Self {
Self {
enqueue_protocol: protocol,
dequeue_protocol: protocol ^ 0x8000,
to_plexer: ingress.0.clone(),
from_plexer: egress.0.subscribe(),
}
}
fn for_server(protocol: crate::bearer::Protocol, ingress: &Ingress, egress: &Egress) -> Self {
Self {
enqueue_protocol: protocol ^ 0x8000,
dequeue_protocol: protocol,
to_plexer: ingress.0.clone(),
from_plexer: egress.0.subscribe(),
}
}
pub async fn enqueue_chunk(&mut self, chunk: Payload) -> Result<(), Error> {
self.to_plexer
.send((self.enqueue_protocol, chunk))
.await
.map_err(|SendError((protocol, payload))| Error::AgentEnqueue(protocol, payload))
}
pub async fn dequeue_chunk(&mut self) -> Result<Payload, Error> {
loop {
let (protocol, payload) = self
.from_plexer
.recv()
.await
.map_err(|_| Error::AgentDequeue)?;
if protocol == self.dequeue_protocol {
trace!(protocol, "message for our protocol");
break Ok(payload);
}
}
}
}
type Ingress = (
tokio::sync::mpsc::Sender<(Protocol, Payload)>,
tokio::sync::mpsc::Receiver<(Protocol, Payload)>,
);
type Egress = (
tokio::sync::broadcast::Sender<(Protocol, Payload)>,
tokio::sync::broadcast::Receiver<(Protocol, Payload)>,
);
pub struct Plexer {
clock: Instant,
bearer: SegmentBuffer,
ingress: Ingress,
egress: Egress,
}
impl Plexer {
pub fn new(bearer: Bearer) -> Self {
Self {
clock: Instant::now(),
bearer: SegmentBuffer::new(bearer),
ingress: tokio::sync::mpsc::channel(100), // TODO: define buffer
egress: tokio::sync::broadcast::channel(100),
}
}
async fn mux(&mut self, msg: (Protocol, Payload)) -> tokio::io::Result<()> {
self.bearer
.write_segment(msg.0, &self.clock, &msg.1)
.await?;
if tracing::event_enabled!(tracing::Level::TRACE) {
trace!(
protocol = msg.0,
data = hex::encode(&msg.1),
"write to bearer"
);
}
Ok(())
}
async fn demux(&mut self, protocol: Protocol, payload: Payload) -> tokio::io::Result<()> {
if tracing::event_enabled!(tracing::Level::TRACE) {
trace!(protocol, data = hex::encode(&payload), "read from bearer");
}
self.egress.0.send((protocol, payload)).unwrap();
Ok(())
}
pub fn subscribe_client(&mut self, protocol: Protocol) -> AgentChannel {
AgentChannel::for_client(protocol, &self.ingress, &self.egress)
}
pub fn subscribe_server(&mut self, protocol: Protocol) -> AgentChannel {
AgentChannel::for_server(protocol, &self.ingress, &self.egress)
}
pub async fn run(&mut self) -> tokio::io::Result<()> {
loop {
trace!("selecting");
select! {
Ok(x) = self.bearer.read_segment() => {
trace!("demux selected");
self.demux(x.0, x.1).await?
},
Some(x) = self.ingress.1.recv() => {
trace!("mux selected");
self.mux(x).await?
},
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => {
trace!("idle plexer");
}
else => {
error!("something else happened");
}
}
}
}
}
/// Protocol value that defines max segment length
pub const MAX_SEGMENT_PAYLOAD_LENGTH: usize = 65535;
fn try_decode_message<M>(buffer: &mut Vec<u8>) -> Result<Option<M>, Error>
where
M: Fragment,
{
let mut decoder = minicbor::Decoder::new(buffer);
let maybe_msg = decoder.decode();
match maybe_msg {
Ok(msg) => {
let pos = decoder.position();
buffer.drain(0..pos);
Ok(Some(msg))
}
Err(err) if err.is_end_of_input() => Ok(None),
Err(err) => {
error!(?err);
trace!("{}", hex::encode(buffer));
Err(Error::Decoding(err.to_string()))
}
}
}
/// A channel abstraction to hide the complexity of partial payloads
pub struct ChannelBuffer {
channel: AgentChannel,
temp: Vec<u8>,
}
impl ChannelBuffer {
pub fn new(channel: AgentChannel) -> Self {
Self {
channel,
temp: Vec::new(),
}
}
/// Enqueues a msg as a sequence payload chunks
pub async fn send_msg_chunks<M>(&mut self, msg: &M) -> Result<(), Error>
where
M: Fragment,
{
let mut payload = Vec::new();
minicbor::encode(msg, &mut payload).map_err(|err| Error::Encoding(err.to_string()))?;
let chunks = payload.chunks(MAX_SEGMENT_PAYLOAD_LENGTH);
for chunk in chunks {
self.channel.enqueue_chunk(Vec::from(chunk)).await?;
}
Ok(())
}
/// Reads from the channel until a complete message is found
pub async fn recv_full_msg<M>(&mut self) -> Result<M, Error>
where
M: Fragment,
{
trace!(len = self.temp.len(), "waiting for full message");
if !self.temp.is_empty() {
trace!("buffer has data from previous payload");
if let Some(msg) = try_decode_message::<M>(&mut self.temp)? {
debug!("decoding done");
return Ok(msg);
}
}
loop {
let chunk = self.channel.dequeue_chunk().await?;
self.temp.extend(chunk);
if let Some(msg) = try_decode_message::<M>(&mut self.temp)? {
debug!("decoding done");
return Ok(msg);
}
trace!("not enough data");
}
}
pub fn unwrap(self) -> AgentChannel {
self.channel
}
}
impl From<AgentChannel> for ChannelBuffer {
fn from(channel: AgentChannel) -> Self {
ChannelBuffer::new(channel)
}
}
#[cfg(test)]
mod tests {
use super::*;
use pallas_codec::minicbor;
#[tokio::test]
async fn multiple_messages_in_same_payload() {
let mut input = Vec::new();
let in_part1 = (1u8, 2u8, 3u8);
let in_part2 = (6u8, 5u8, 4u8);
minicbor::encode(in_part1, &mut input).unwrap();
minicbor::encode(in_part2, &mut input).unwrap();
let ingress = tokio::sync::mpsc::channel(100);
let egress = tokio::sync::broadcast::channel(100);
let channel = AgentChannel::for_client(0, &ingress, &egress);
egress.0.send((0 ^ 0x8000, input)).unwrap();
let mut buf = ChannelBuffer::new(channel);
let out_part1 = buf.recv_full_msg::<(u8, u8, u8)>().await.unwrap();
let out_part2 = buf.recv_full_msg::<(u8, u8, u8)>().await.unwrap();
assert_eq!(in_part1, out_part1);
assert_eq!(in_part2, out_part2);
}
#[tokio::test]
async fn fragmented_message_in_multiple_payloads() {
let mut input = Vec::new();
let msg = (11u8, 12u8, 13u8, 14u8, 15u8, 16u8, 17u8);
minicbor::encode(msg, &mut input).unwrap();
let ingress = tokio::sync::mpsc::channel(100);
let egress = tokio::sync::broadcast::channel(100);
let channel = AgentChannel::for_client(0, &ingress, &egress);
while !input.is_empty() {
let chunk = Vec::from(input.drain(0..2).as_slice());
egress.0.send((0 ^ 0x8000, chunk)).unwrap();
}
let mut buf = ChannelBuffer::new(channel);
let out_msg = buf
.recv_full_msg::<(u8, u8, u8, u8, u8, u8, u8)>()
.await
.unwrap();
assert_eq!(msg, out_msg);
}
}

View file

@ -0,0 +1,62 @@
use std::net::{Ipv4Addr, SocketAddrV4};
use pallas_network::{bearer::Bearer, plexer::Plexer};
use rand::{distributions::Uniform, Rng};
use tokio::net::TcpListener;
async fn setup_passive_muxer<const P: u16>() -> Plexer {
let server = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P))
.await
.unwrap();
println!("listening for connections on port {}", P);
let (bearer, _) = Bearer::accept_tcp(server).await.unwrap();
Plexer::new(bearer)
}
async fn setup_active_muxer<const P: u16>() -> Plexer {
let bearer = Bearer::connect_tcp(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P))
.await
.unwrap();
println!("active plexer connected");
Plexer::new(bearer)
}
fn random_payload(size: usize) -> Vec<u8> {
let range = Uniform::from(0..255);
rand::thread_rng().sample_iter(&range).take(size).collect()
}
#[tokio::test]
async fn one_way_small_sequence_of_payloads() {
let passive = tokio::spawn(setup_passive_muxer::<50301>());
// HACK: a small sleep seems to be required for Github actions runner to
// formally expose the port
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let mut active = setup_active_muxer::<50301>().await;
let mut passive = passive.await.unwrap();
let mut sender_channel = active.subscribe_client(3);
let mut receiver_channel = passive.subscribe_server(3);
let passive_run = tokio::spawn(async move { passive.run().await });
let active_run = tokio::spawn(async move { active.run().await });
for _ in 0..100 {
let payload = random_payload(50);
println!("sending chunk");
sender_channel.enqueue_chunk(payload.clone()).await.unwrap();
let received_payload = receiver_channel.dequeue_chunk().await.unwrap();
assert_eq!(payload, received_payload);
}
passive_run.abort();
active_run.abort();
}

View file

@ -0,0 +1,146 @@
use pallas_network::facades::PeerClient;
use pallas_network::miniprotocols::{
blockfetch,
chainsync::{self, NextResponse},
Point,
};
#[tokio::test]
#[ignore]
pub async fn chainsync_history_happy_path() {
let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2)
.await
.unwrap();
let client = peer.chainsync();
let known_point = Point::Specific(
1654413,
hex::decode("7de1f036df5a133ce68a82877d14354d0ba6de7625ab918e75f3e2ecb29771c2").unwrap(),
);
let (point, _) = client
.find_intersect(vec![known_point.clone()])
.await
.unwrap();
println!("{:?}", point);
assert!(matches!(client.state(), chainsync::State::Idle));
match point {
Some(point) => assert_eq!(point, known_point),
None => panic!("expected point"),
}
let next = client.request_next().await.unwrap();
match next {
NextResponse::RollBackward(point, _) => assert_eq!(point, known_point),
_ => panic!("expected rollback"),
}
assert!(matches!(client.state(), chainsync::State::Idle));
for _ in 0..10 {
let next = client.request_next().await.unwrap();
match next {
NextResponse::RollForward(_, _) => (),
_ => panic!("expected roll-forward"),
}
assert!(matches!(client.state(), chainsync::State::Idle));
}
client.send_done().await.unwrap();
assert!(matches!(client.state(), chainsync::State::Done));
}
#[tokio::test]
#[ignore]
pub async fn chainsync_tip_happy_path() {
let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2)
.await
.unwrap();
let client = peer.chainsync();
client.intersect_tip().await.unwrap();
assert!(matches!(client.state(), chainsync::State::Idle));
let next = client.request_next().await.unwrap();
assert!(matches!(next, NextResponse::RollBackward(..)));
let mut await_count = 0;
for _ in 0..4 {
let next = if client.has_agency() {
client.request_next().await.unwrap()
} else {
await_count += 1;
client.recv_while_must_reply().await.unwrap()
};
match next {
NextResponse::RollForward(_, _) => (),
NextResponse::Await => (),
_ => panic!("expected roll-forward or await"),
}
}
assert!(await_count > 0, "tip was never reached");
client.send_done().await.unwrap();
assert!(matches!(client.state(), chainsync::State::Done));
}
#[tokio::test]
#[ignore]
pub async fn blockfetch_happy_path() {
let mut peer = PeerClient::connect("preview-node.world.dev.cardano.org:30002", 2)
.await
.unwrap();
let client = peer.blockfetch();
let known_point = Point::Specific(
1654413,
hex::decode("7de1f036df5a133ce68a82877d14354d0ba6de7625ab918e75f3e2ecb29771c2").unwrap(),
);
let range_ok = client
.request_range((known_point.clone(), known_point))
.await;
assert!(matches!(client.state(), blockfetch::State::Streaming));
println!("streaming...");
assert!(matches!(range_ok, Ok(_)));
for _ in 0..1 {
let next = client.recv_while_streaming().await.unwrap();
match next {
Some(body) => assert_eq!(body.len(), 3251),
_ => panic!("expected block body"),
}
assert!(matches!(client.state(), blockfetch::State::Streaming));
}
let next = client.recv_while_streaming().await.unwrap();
assert!(matches!(next, None));
client.send_done().await.unwrap();
assert!(matches!(client.state(), blockfetch::State::Done));
}
// TODO: redo txsubmission client test

View file

@ -11,18 +11,16 @@ readme = "README.md"
authors = ["Santiago Carmuega <santiago@carmuega.me>"]
[dependencies]
async-trait = "0.1.68"
byteorder = "1.4.3"
gasket = { git = "https://github.com/construkts/gasket-rs" }
# gasket = { path = "../../../construkts/gasket-rs" }
hex = "0.4.3"
mio = { version = "0.8.6", features = ["net", "os-poll"] }
# gasket = { version = "0.1.0", path = "../../../construkts/gasket-rs" }
pallas-codec = { version = "0.18.0", path = "../pallas-codec" }
pallas-crypto = { version = "0.18.0", path = "../pallas-crypto" }
pallas-miniprotocols = { version = "0.18.0", path = "../pallas-miniprotocols" }
pallas-multiplexer = { version = "0.18.0", path = "../pallas-multiplexer" }
pallas-network = { version = "0.18.0", path = "../pallas-network" }
pallas-traverse = { version = "0.18.0", path = "../pallas-traverse" }
rayon = "1.7.0"
serde = { version = "1.0.154", features = ["derive"] }
thiserror = "1.0.31"
tokio = { version = "1", features = ["net", "macros", "io-util"] }

View file

@ -1,126 +0,0 @@
pub use crate::framework::{BlockFetchEvent, Cursor, DownstreamPort, Intersection};
pub mod n2n {
use crate::{blockfetch, chainsync, framework::*, plexer};
use gasket::{
messaging::{SendAdapter, SendPort},
runtime::Tether,
};
pub struct Runtime {
pub plexer_tether: Tether,
pub chainsync_tether: Tether,
pub blockfetch_tether: Tether,
}
pub struct Bootstrapper<A, C>
where
A: SendAdapter<BlockFetchEvent>,
C: Cursor,
{
cursor: C,
peer_address: String,
network_magic: u64,
output: super::DownstreamPort<A>,
}
impl<A, C> Bootstrapper<A, C>
where
A: SendAdapter<BlockFetchEvent> + 'static,
C: Cursor + 'static,
{
pub fn new(cursor: C, peer_address: String, network_magic: u64) -> Self {
Bootstrapper {
cursor,
peer_address,
network_magic,
output: Default::default(),
}
}
pub fn connect_output(&mut self, adapter: A) {
self.output.connect(adapter);
}
pub fn spawn(self) -> Result<Runtime, Error> {
/*
TODO: this is how we envision the setup of complex pipelines leveraging Rust macros:
pipeline!(
plexer = plexer::Worker::new(xx),
chainsync = chainsync::Worker::new(yy),
blockfetch = blockfetch::Worker::new(yy),
reducer = reducer::Worker::new(yy),
plexer.demux2 => chainsync.demux2,
plexer.demux3 => blockfetch.demux3,
chainsync.mux2 + blockfetch.mux3 => plexer.mux,
chainsync.downstream => blockfetch.upstream,
blockfetch.downstream => reducer.upstream,
);
The above snippet would replace the rest of the code in this function, which is just a more verbose, manual way of saying the same thing.
*/
let mut mux_input = MuxInputPort::default();
let mut demux2_out = DemuxOutputPort::default();
let mut demux2_in = DemuxInputPort::default();
gasket::messaging::tokio::connect_ports(&mut demux2_out, &mut demux2_in, 1000);
let mut demux3_out = DemuxOutputPort::default();
let mut demux3_in = DemuxInputPort::default();
gasket::messaging::tokio::connect_ports(&mut demux3_out, &mut demux3_in, 1000);
let mut mux2_out = MuxOutputPort::default();
let mut mux3_out = MuxOutputPort::default();
gasket::messaging::tokio::funnel_ports(
vec![&mut mux2_out, &mut mux3_out],
&mut mux_input,
1000,
);
let mut chainsync_downstream = chainsync::DownstreamPort::default();
let mut blockfetch_upstream = blockfetch::UpstreamPort::default();
gasket::messaging::tokio::connect_ports(
&mut chainsync_downstream,
&mut blockfetch_upstream,
100,
);
let plexer_tether = gasket::runtime::spawn_stage(
plexer::Worker::new(
self.peer_address,
self.network_magic,
mux_input,
Some(demux2_out),
Some(demux3_out),
),
gasket::runtime::Policy::default(),
Some("plexer"),
);
let channel2 = ProtocolChannel(2, mux2_out, demux2_in);
let chainsync_tether = gasket::runtime::spawn_stage(
chainsync::Worker::new(self.cursor, channel2, chainsync_downstream),
gasket::runtime::Policy::default(),
Some("chainsync"),
);
let channel3 = ProtocolChannel(3, mux3_out, demux3_in);
let blockfetch_tether = gasket::runtime::spawn_stage(
blockfetch::Worker::new(channel3, blockfetch_upstream, self.output),
gasket::runtime::Policy::default(),
Some("blockfetch"),
);
Ok(Runtime {
plexer_tether,
chainsync_tether,
blockfetch_tether,
})
}
}
}

View file

@ -1,106 +0,0 @@
use gasket::messaging::SendAdapter;
use gasket::runtime::WorkSchedule;
use tracing::{error, info, instrument};
use pallas_crypto::hash::Hash;
use pallas_miniprotocols::blockfetch;
use pallas_miniprotocols::Point;
use crate::framework::*;
pub type UpstreamPort = gasket::messaging::tokio::InputPort<ChainSyncEvent>;
pub type OuroborosClient = blockfetch::Client<ProtocolChannel>;
pub struct Worker<T>
where
T: Send + Sync,
{
client: OuroborosClient,
upstream: UpstreamPort,
downstream: DownstreamPort<T>,
block_count: gasket::metrics::Counter,
}
impl<T> Worker<T>
where
T: Send + Sync,
{
pub fn new(
plexer: ProtocolChannel,
upstream: UpstreamPort,
downstream: DownstreamPort<T>,
) -> Self {
let client = OuroborosClient::new(plexer);
Self {
client,
upstream,
downstream,
block_count: Default::default(),
}
}
#[instrument(skip(self), fields(slot, %hash))]
async fn fetch_block(
&mut self,
slot: u64,
hash: &Hash<32>,
) -> Result<Vec<u8>, gasket::error::Error> {
info!("fetching block");
match self
.client
.fetch_single(Point::Specific(slot, hash.to_vec()))
.await
{
Ok(x) => {
info!("block fetch succeeded");
Ok(x)
}
Err(blockfetch::Error::ChannelError(x)) => {
error!("plexer channel error: {}", x);
Err(gasket::error::Error::RetryableError)
}
Err(x) => {
error!("unrecoverable block fetch error: {}", x);
Err(gasket::error::Error::WorkPanic)
}
}
}
}
impl<A> gasket::runtime::Worker for Worker<A>
where
A: SendAdapter<BlockFetchEvent>,
{
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Builder::new()
.with_counter("fetched_blocks", &self.block_count)
.build()
}
type WorkUnit = ChainSyncEvent;
async fn schedule(&mut self) -> gasket::runtime::ScheduleResult<Self::WorkUnit> {
let msg = self.upstream.recv().await?;
info!("scheduling block betch");
Ok(WorkSchedule::Unit(msg.payload))
}
async fn execute(&mut self, unit: &Self::WorkUnit) -> Result<(), gasket::error::Error> {
let output = match unit {
ChainSyncEvent::RollForward(s, h) => {
let body = self.fetch_block(*s, h).await?;
self.block_count.inc(1);
BlockFetchEvent::RollForward(*s, h.clone(), body)
}
ChainSyncEvent::Rollback(x) => BlockFetchEvent::Rollback(x.clone()),
};
self.downstream.send(output.into()).await?;
Ok(())
}
}

View file

@ -1,176 +0,0 @@
use gasket::error::AsWorkError;
use tracing::{debug, info};
use pallas_miniprotocols::chainsync::{HeaderContent, NextResponse, Tip};
use pallas_miniprotocols::{chainsync, Point};
use pallas_traverse::MultiEraHeader;
use crate::framework::*;
fn to_traverse(header: &chainsync::HeaderContent) -> Result<MultiEraHeader<'_>, Error> {
let out = match header.byron_prefix {
Some((subtag, _)) => MultiEraHeader::decode(header.variant, Some(subtag), &header.cbor),
None => MultiEraHeader::decode(header.variant, None, &header.cbor),
};
out.map_err(Error::parse)
}
pub type DownstreamPort = gasket::messaging::tokio::OutputPort<ChainSyncEvent>;
pub type OuroborosClient = chainsync::N2NClient<ProtocolChannel>;
pub struct Worker<C>
where
C: Cursor,
{
chain_cursor: C,
client: OuroborosClient,
downstream: DownstreamPort,
block_count: gasket::metrics::Counter,
chain_tip: gasket::metrics::Gauge,
}
impl<C> Worker<C>
where
C: Cursor,
{
pub fn new(chain_cursor: C, plexer: ProtocolChannel, downstream: DownstreamPort) -> Self {
let client = OuroborosClient::new(plexer);
Self {
chain_cursor,
client,
downstream,
block_count: Default::default(),
chain_tip: Default::default(),
}
}
fn notify_tip(&self, tip: Tip) {
self.chain_tip.set(tip.0.slot_or_default() as i64);
}
async fn intersect(&mut self) -> Result<(), gasket::error::Error> {
let value = self.chain_cursor.intersection();
let intersect = match value {
Intersection::Origin => {
info!("intersecting origin");
self.client.intersect_origin().await.or_restart()?.into()
}
Intersection::Tip => {
info!("intersecting tip");
self.client.intersect_tip().await.or_restart()?.into()
}
Intersection::Breadcrumbs(points) => {
info!("intersecting breadcrumbs");
let (point, tip) = self
.client
.find_intersect(Vec::from(points))
.await
.or_restart()?;
self.notify_tip(tip);
point
}
};
info!(?intersect, "intersected");
Ok(())
}
async fn process_next(
&mut self,
next: NextResponse<HeaderContent>,
) -> Result<(), gasket::error::Error> {
match next {
chainsync::NextResponse::RollForward(header, tip) => {
let header = to_traverse(&header).or_panic()?;
debug!(slot = header.slot(), hash = %header.hash(), "chain sync roll forward");
self.downstream
.send(ChainSyncEvent::RollForward(header.slot(), header.hash()).into())
.await?;
self.notify_tip(tip);
Ok(())
}
chainsync::NextResponse::RollBackward(point, tip) => {
match &point {
Point::Origin => debug!("rollback to origin"),
Point::Specific(slot, _) => debug!(slot, "rollback"),
};
self.downstream
.send(ChainSyncEvent::Rollback(point).into())
.await?;
self.notify_tip(tip);
Ok(())
}
chainsync::NextResponse::Await => {
info!("chain-sync reached the tip of the chain");
Ok(())
}
}
}
async fn request_next(&mut self) -> Result<(), gasket::error::Error> {
info!("requesting next block");
let next = self.client.request_next().await.or_restart()?;
self.process_next(next).await
}
async fn await_next(&mut self) -> Result<(), gasket::error::Error> {
info!("awaiting next block (blocking)");
let next = self.client.recv_while_must_reply().await.or_restart()?;
self.process_next(next).await
}
}
pub enum WorkUnit {
Intersect,
RequestNext,
AwaitNext,
}
impl<C> gasket::runtime::Worker for Worker<C>
where
C: Cursor + Sync + Send,
{
type WorkUnit = WorkUnit;
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Builder::new()
.with_counter("received_blocks", &self.block_count)
.with_gauge("chain_tip", &self.chain_tip)
.build()
}
async fn bootstrap(&mut self) -> gasket::runtime::ScheduleResult<Self::WorkUnit> {
Ok(gasket::runtime::WorkSchedule::Unit(WorkUnit::Intersect))
}
async fn schedule(&mut self) -> gasket::runtime::ScheduleResult<Self::WorkUnit> {
match self.client.has_agency() {
true => Ok(gasket::runtime::WorkSchedule::Unit(WorkUnit::RequestNext)),
false => Ok(gasket::runtime::WorkSchedule::Unit(WorkUnit::AwaitNext)),
}
}
async fn execute(&mut self, unit: &Self::WorkUnit) -> Result<(), gasket::error::Error> {
match unit {
WorkUnit::Intersect => self.intersect().await?,
WorkUnit::RequestNext => self.request_next().await?,
WorkUnit::AwaitNext => self.await_next().await?,
};
Ok(())
}
}

View file

@ -1,8 +1,5 @@
use pallas_crypto::hash::Hash;
use pallas_miniprotocols::Point;
use pallas_multiplexer as multiplexer;
use thiserror::Error;
use tracing::{error, trace};
use pallas_network::miniprotocols::Point;
pub type BlockSlot = u64;
pub type BlockHash = Hash<32>;
@ -20,112 +17,10 @@ pub trait Cursor: Send + Sync {
}
#[derive(Debug, Clone)]
pub enum ChainSyncEvent {
RollForward(BlockSlot, BlockHash),
Rollback(Point),
}
#[derive(Debug, Clone)]
pub enum BlockFetchEvent {
pub enum UpstreamEvent {
RollForward(BlockSlot, BlockHash, RawBlock),
Rollback(Point),
}
// ports used by plexer
pub type MuxOutputPort = gasket::messaging::tokio::OutputPort<(u16, multiplexer::Payload)>;
pub type DemuxInputPort = gasket::messaging::tokio::InputPort<multiplexer::Payload>;
// ports used by mini-protocols
pub type MuxInputPort = gasket::messaging::tokio::InputPort<(u16, multiplexer::Payload)>;
pub type DemuxOutputPort = gasket::messaging::tokio::OutputPort<multiplexer::Payload>;
// final output port
pub type DownstreamPort<A> = gasket::messaging::OutputPort<A, BlockFetchEvent>;
pub struct ProtocolChannel(pub u16, pub MuxOutputPort, pub DemuxInputPort);
impl multiplexer::agents::Channel for ProtocolChannel {
async fn enqueue_chunk(
&mut self,
payload: multiplexer::Payload,
) -> Result<(), multiplexer::agents::ChannelError> {
trace!(
protocol = self.0,
payload = hex::encode(&payload),
"enqueing"
);
let res = self
.1
.send(gasket::messaging::Message::from((self.0, payload)))
.await;
match res {
Ok(_) => Ok(()),
Err(error) => {
error!(?error, "enqueue chunk failed");
Err(multiplexer::agents::ChannelError::NotConnected(None))
}
}
}
async fn dequeue_chunk(
&mut self,
) -> Result<multiplexer::Payload, multiplexer::agents::ChannelError> {
let res = self.2.recv().await;
match res {
Ok(msg) => Ok(msg.payload),
Err(error) => {
error!(?error, "dequeue chunk failed");
Err(multiplexer::agents::ChannelError::NotConnected(None))
}
}
}
}
#[derive(Error, Debug)]
pub enum Error {
#[error("{0}")]
Client(String),
#[error("{0}")]
Parse(String),
#[error("{0}")]
Server(String),
#[error("{0}")]
Message(String),
#[error("{0}")]
Custom(String),
}
impl Error {
pub fn client(error: impl ToString) -> Error {
Error::Client(error.to_string())
}
pub fn parse(error: impl ToString) -> Error {
Error::Parse(error.to_string())
}
pub fn server(error: impl ToString) -> Error {
Error::Server(error.to_string())
}
pub fn message(error: impl ToString) -> Error {
Error::Message(error.to_string())
}
pub fn custom(error: impl Into<Box<dyn std::error::Error>>) -> Error {
Error::Custom(format!("{}", error.into()))
}
}
impl From<Box<dyn std::error::Error>> for Error {
fn from(err: Box<dyn std::error::Error>) -> Self {
Error::custom(err)
}
}
pub type DownstreamPort<A> = gasket::messaging::OutputPort<A, UpstreamEvent>;

View file

@ -1,10 +1,8 @@
#![feature(async_fn_in_trait)]
pub(crate) mod blockfetch;
pub(crate) mod chainsync;
pub(crate) mod framework;
pub(crate) mod plexer;
pub(crate) mod worker;
mod api;
pub use crate::framework::{Cursor, DownstreamPort, Intersection, UpstreamEvent};
pub use api::*;
pub mod n2n {
pub use crate::worker::Worker;
}

View file

@ -1,436 +0,0 @@
use std::future::ready;
use byteorder::{ByteOrder, NetworkEndian};
use gasket::error::AsWorkError;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf, ReadHalf, WriteHalf};
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::select;
use tokio::time::Instant;
use tracing::{debug, error, info, trace, warn};
use pallas_miniprotocols::handshake;
use crate::framework::*;
const HEADER_LEN: usize = 8;
pub type Timestamp = u32;
pub type Payload = Vec<u8>;
pub type Protocol = u16;
/// A `Header` struct represents an Ouroboros segment header.
///
/// # Examples
///
/// Converting a `Header` to bytes:
///
/// ```
/// use byteorder::{BigEndian, ByteOrder};
/// use pallas_upstream::plexer::Header;
///
/// let header = Header {
/// protocol: 0x01,
/// timestamp: 1619804871,
/// payload_len: 42,
/// };
///
/// let header_bytes: [u8; 8] = header.into();
/// assert_eq!(header_bytes, [97, 75, 168, 15, 128, 1, 0, 42]);
/// ```
///
/// Converting bytes to a `Header`:
///
/// ```
/// use byteorder::{BigEndian, ByteOrder};
/// use pallas_upstream::plexer::Header;
///
/// let bytes = [97, 75, 168, 15, 128, 1, 0, 42];
/// let header: Header = (&bytes[..]).into();
///
/// assert_eq!(header.protocol, 0x01);
/// assert_eq!(header.timestamp, 1619804871);
/// assert_eq!(header.payload_len, 42);
/// ```
#[derive(Debug)]
pub struct Header {
pub protocol: Protocol,
pub timestamp: Timestamp,
pub payload_len: u16,
}
impl From<&[u8]> for Header {
fn from(value: &[u8]) -> Self {
let timestamp = NetworkEndian::read_u32(&value[0..4]);
let protocol = NetworkEndian::read_u16(&value[4..6]) ^ 0x8000;
let payload_len = NetworkEndian::read_u16(&value[6..8]);
Self {
timestamp,
protocol,
payload_len,
}
}
}
impl From<Header> for [u8; 8] {
fn from(value: Header) -> Self {
let mut out = [0u8; 8];
NetworkEndian::write_u32(&mut out[0..4], value.timestamp);
NetworkEndian::write_u16(&mut out[4..6], value.protocol);
NetworkEndian::write_u16(&mut out[6..8], value.payload_len);
out
}
}
pub struct Segment {
pub header: Header,
pub payload: Payload,
}
use tokio::io::{AsyncReadExt, AsyncWriteExt};
struct AsyncBearer(OwnedReadHalf, OwnedWriteHalf, Instant);
impl AsyncBearer {
async fn connect_tcp(addr: impl ToSocketAddrs) -> Result<Self, std::io::Error> {
let stream = TcpStream::connect(addr).await?;
let (read, write) = stream.into_split();
Ok(Self(read, write, Instant::now()))
}
}
impl AsyncBearer {
async fn readable(&self) -> tokio::io::Result<()> {
self.0.readable().await
}
/// Peek the available data in search for a frame header
async fn peek_header(&mut self) -> tokio::io::Result<Option<Header>> {
let mut buf = [0u8; HEADER_LEN];
let len = self.0.peek(&mut buf).await?;
if len < HEADER_LEN {
return Ok(None);
}
Ok(Some(Header::from(buf.as_slice())))
}
async fn has_payload(&mut self, payload_len: usize) -> tokio::io::Result<bool> {
let segment_size = HEADER_LEN + payload_len;
let mut buf = vec![0u8; segment_size];
let available = self.0.peek(&mut buf).await?;
return Ok(available >= segment_size);
}
/// Peeks the bearer to see if a full segment is available to be read
async fn has_segment(&mut self) -> std::io::Result<bool> {
let header = match self.peek_header().await? {
Some(x) => x,
None => return Ok(false),
};
self.has_payload(header.payload_len as usize).await
}
/// Reads a full segment from the bearer while consuming the bytes
///
/// This function is NOT "cancel safe", meaning that it shouldn't be used
/// inside the context of a select!. Only call this function once you're
/// sure that you can await until all the required bytes are available.
async fn read_segment(&mut self) -> tokio::io::Result<(Protocol, Payload)> {
let mut buf = [0u8; HEADER_LEN];
self.0.read_exact(&mut buf).await?;
let header = Header::from(buf.as_slice());
// TODO: assert any business invariants regarding timestamp from the other party
let mut payload = vec![0u8; header.payload_len as usize];
self.0.read_exact(&mut payload).await?;
Ok((header.protocol, payload))
}
async fn write_segment(&mut self, protocol: u16, payload: &[u8]) -> Result<(), std::io::Error> {
let header = Header {
protocol,
timestamp: self.2.elapsed().as_micros() as u32,
payload_len: payload.len() as u16,
};
let buf: [u8; 8] = header.into();
self.1.write_all(&buf).await?;
self.1.write_all(&payload).await?;
Ok(())
}
}
pub struct AsyncAgentChannel(
Protocol,
tokio::sync::mpsc::Sender<(Protocol, Payload)>,
tokio::sync::broadcast::Receiver<(Protocol, Payload)>,
);
impl pallas_multiplexer::agents::Channel for AsyncAgentChannel {
async fn enqueue_chunk(
&mut self,
chunk: pallas_multiplexer::Payload,
) -> Result<(), pallas_multiplexer::agents::ChannelError> {
let res = self.1.send((self.0, chunk)).await;
res.map_err(|err| pallas_multiplexer::agents::ChannelError::NotConnected(Some(err.0 .1)))
}
async fn dequeue_chunk(
&mut self,
) -> Result<pallas_multiplexer::Payload, pallas_multiplexer::agents::ChannelError> {
loop {
let (protocol, payload) = self
.2
.recv()
.await
.map_err(|err| pallas_multiplexer::agents::ChannelError::NotConnected(None))?;
if protocol == self.0 {
break Ok(payload);
}
}
}
}
pub type AsyncIngress = (
tokio::sync::mpsc::Sender<(Protocol, Payload)>,
tokio::sync::mpsc::Receiver<(Protocol, Payload)>,
);
pub type AsyncEgress = (
tokio::sync::broadcast::Sender<(Protocol, Payload)>,
tokio::sync::broadcast::Receiver<(Protocol, Payload)>,
);
struct AsyncPlexer {
bearer: AsyncBearer,
ingress: AsyncIngress,
egress: AsyncEgress,
}
impl AsyncPlexer {
pub fn new(bearer: AsyncBearer) -> Self {
Self {
bearer,
ingress: tokio::sync::mpsc::channel(100), // TODO: define buffer
egress: tokio::sync::broadcast::channel(100),
}
}
async fn mux(&mut self, msg: (Protocol, Payload)) -> tokio::io::Result<()> {
self.bearer.write_segment(msg.0, &msg.1).await?;
Ok(())
}
async fn demux(&mut self) -> tokio::io::Result<()> {
let (protocol, payload) = self.bearer.read_segment().await?;
self.egress.0.send((protocol, payload)).unwrap();
Ok(())
}
pub fn subscribe(&mut self, protocol: Protocol) -> AsyncAgentChannel {
let agent_tx = self.ingress.0.clone();
let agent_rx = self.egress.0.subscribe();
AsyncAgentChannel(protocol, agent_tx, agent_rx)
}
pub async fn run(&mut self) -> tokio::io::Result<()> {
loop {
select! {
Ok(_) = self.bearer.readable() => {
if let Ok(true) = self.bearer.has_segment().await {
trace!("demux selected");
self.demux().await?
}
},
Some(x) = self.ingress.1.recv() => {
trace!("mux selected");
self.mux(x).await?
},
}
}
}
}
impl From<AsyncBearer> for AsyncPlexer {
fn from(value: AsyncBearer) -> Self {
Self::new(value)
}
}
impl From<AsyncPlexer> for AsyncBearer {
fn from(value: AsyncPlexer) -> Self {
value.bearer
}
}
async fn handshake(
plexer: &mut AsyncPlexer,
network_magic: u64,
) -> Result<(), gasket::error::Error> {
info!("executing handshake");
let channel0 = plexer.subscribe(0);
let versions = handshake::n2n::VersionTable::v7_and_above(network_magic);
let mut client = handshake::Client::new(channel0);
//let p = tokio::spawn(plexer.run());
//let output = client.handshake(versions).or_restart()?;
let output = select! {
x = client.handshake(versions) => x.or_restart()?,
x = plexer.run() => {
match x.or_restart() {
Err(x) => return Err(x),
_ => unreachable!(),
};
},
};
debug!("handshake output: {:?}", output);
//p.abort();
match output {
handshake::Confirmation::Accepted(version, _) => {
info!(version, "connected to upstream peer");
Ok(())
}
_ => {
error!("couldn't agree on handshake version");
Err(gasket::error::Error::WorkPanic)
}
}
}
pub struct Worker {
peer_address: String,
network_magic: u64,
bearer: Option<AsyncBearer>,
mux_input: MuxInputPort,
channel2_out: Option<DemuxOutputPort>,
channel3_out: Option<DemuxOutputPort>,
ops_count: gasket::metrics::Counter,
}
impl Worker {
pub fn new(
peer_address: String,
network_magic: u64,
mux_input: MuxInputPort,
channel2_out: Option<DemuxOutputPort>,
channel3_out: Option<DemuxOutputPort>,
) -> Self {
Self {
peer_address,
network_magic,
channel2_out,
channel3_out,
mux_input,
bearer: None,
ops_count: Default::default(),
}
}
}
pub enum WorkUnit {
Connect,
Mux((u16, Vec<u8>)),
Demux,
}
impl gasket::runtime::Worker for Worker {
type WorkUnit = WorkUnit;
fn metrics(&self) -> gasket::metrics::Registry {
// TODO: define networking metrics (bytes in / out, etc)
gasket::metrics::Builder::new()
.with_counter("ops_count", &self.ops_count)
.build()
}
async fn bootstrap(&mut self) -> gasket::runtime::ScheduleResult<Self::WorkUnit> {
Ok(gasket::runtime::WorkSchedule::Unit(WorkUnit::Connect))
}
async fn schedule(&mut self) -> gasket::runtime::ScheduleResult<Self::WorkUnit> {
let bearer = self.bearer.as_mut().unwrap();
trace!("selecting");
select! {
Ok(msg) = self.mux_input.recv() => { Ok(gasket::runtime::WorkSchedule::Unit(WorkUnit::Mux(msg.payload))) }
Ok(true) = bearer.has_segment() => Ok(gasket::runtime::WorkSchedule::Unit(WorkUnit::Demux)),
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => Ok(gasket::runtime::WorkSchedule::Idle),
}
}
async fn execute(&mut self, unit: &Self::WorkUnit) -> Result<(), gasket::error::Error> {
match unit {
WorkUnit::Connect => {
debug!("connecting");
let bearer = AsyncBearer::connect_tcp(&self.peer_address)
.await
.or_retry()?;
let mut plexer = bearer.into();
handshake(&mut plexer, self.network_magic).await?;
self.bearer = Some(plexer.into());
}
WorkUnit::Mux(x) => {
trace!("muxing");
self.bearer
.as_mut()
.unwrap()
.write_segment(x.0, &x.1)
.await
.or_restart()?;
}
WorkUnit::Demux => {
trace!("demuxing");
let (protocol, payload) = self
.bearer
.as_mut()
.unwrap()
.read_segment()
.await
.or_restart()?;
match protocol {
2 => {
if let Some(channel) = &mut self.channel2_out {
channel.send(payload.into()).await?;
trace!("sent protocol 2 msg");
}
}
3 => {
if let Some(channel) = &mut self.channel3_out {
channel.send(payload.into()).await?;
trace!("sent protocol 3 msg");
}
}
x => warn!("trying to demux unexpected protocol {x}"),
}
}
};
Ok(())
}
}

View file

@ -0,0 +1,197 @@
use gasket::error::AsWorkError;
use tracing::{debug, info};
use pallas_network::facades::PeerClient;
use pallas_network::miniprotocols::chainsync::{self, HeaderContent, NextResponse, Tip};
use pallas_network::miniprotocols::Point;
use pallas_traverse::MultiEraHeader;
use crate::framework::*;
fn to_traverse(header: &HeaderContent) -> Result<MultiEraHeader<'_>, gasket::error::Error> {
let out = match header.byron_prefix {
Some((subtag, _)) => MultiEraHeader::decode(header.variant, Some(subtag), &header.cbor),
None => MultiEraHeader::decode(header.variant, None, &header.cbor),
};
out.or_panic()
}
pub type DownstreamPort = gasket::messaging::tokio::OutputPort<UpstreamEvent>;
pub struct Worker<C>
where
C: Cursor,
{
peer_address: String,
network_magic: u64,
chain_cursor: C,
peer_session: Option<PeerClient>,
downstream: DownstreamPort,
block_count: gasket::metrics::Counter,
chain_tip: gasket::metrics::Gauge,
}
impl<C> Worker<C>
where
C: Cursor,
{
pub fn new(
peer_address: String,
network_magic: u64,
chain_cursor: C,
downstream: DownstreamPort,
) -> Self {
Self {
peer_address,
network_magic,
chain_cursor,
downstream,
peer_session: None,
block_count: Default::default(),
chain_tip: Default::default(),
}
}
fn notify_tip(&self, tip: &Tip) {
self.chain_tip.set(tip.0.slot_or_default() as i64);
}
async fn intersect(&mut self) -> Result<(), gasket::error::Error> {
let value = self.chain_cursor.intersection();
let chainsync = self.peer_session.as_mut().unwrap().chainsync();
let intersect = match value {
Intersection::Origin => {
info!("intersecting origin");
chainsync.intersect_origin().await.or_restart()?.into()
}
Intersection::Tip => {
info!("intersecting tip");
chainsync.intersect_tip().await.or_restart()?.into()
}
Intersection::Breadcrumbs(points) => {
info!("intersecting breadcrumbs");
let (point, tip) = chainsync.find_intersect(points).await.or_restart()?;
self.notify_tip(&tip);
point
}
};
info!(?intersect, "intersected");
Ok(())
}
async fn process_next(
&mut self,
next: &NextResponse<HeaderContent>,
) -> Result<(), gasket::error::Error> {
match next {
NextResponse::RollForward(header, tip) => {
let header = to_traverse(header).or_panic()?;
let slot = header.slot();
let hash = header.hash();
debug!(slot, %hash, "chain sync roll forward");
let block = self
.peer_session
.as_mut()
.unwrap()
.blockfetch()
.fetch_single(pallas_network::miniprotocols::Point::Specific(
slot,
hash.to_vec(),
))
.await
.or_retry()?;
self.downstream
.send(UpstreamEvent::RollForward(slot, hash, block).into())
.await?;
self.notify_tip(tip);
Ok(())
}
chainsync::NextResponse::RollBackward(point, tip) => {
match &point {
Point::Origin => debug!("rollback to origin"),
Point::Specific(slot, _) => debug!(slot, "rollback"),
};
self.downstream
.send(UpstreamEvent::Rollback(point.clone()).into())
.await?;
self.notify_tip(tip);
Ok(())
}
chainsync::NextResponse::Await => {
info!("chain-sync reached the tip of the chain");
Ok(())
}
}
}
}
#[async_trait::async_trait]
impl<C> gasket::runtime::Worker for Worker<C>
where
C: Cursor + Sync + Send,
{
type WorkUnit = NextResponse<HeaderContent>;
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Builder::new()
.with_counter("received_blocks", &self.block_count)
.with_gauge("chain_tip", &self.chain_tip)
.build()
}
async fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
debug!("connecting");
let peer = PeerClient::connect(&self.peer_address, self.network_magic)
.await
.or_restart()?;
self.peer_session = Some(peer);
self.intersect().await?;
Ok(())
}
async fn teardown(&mut self) -> Result<(), gasket::error::Error> {
self.peer_session.as_mut().unwrap().abort();
Ok(())
}
async fn schedule(&mut self) -> gasket::runtime::ScheduleResult<Self::WorkUnit> {
let client = self.peer_session.as_mut().unwrap().chainsync();
let next = match client.has_agency() {
true => {
info!("requesting next block");
client.request_next().await.or_restart()?
}
false => {
info!("awaiting next block (blocking)");
client.recv_while_must_reply().await.or_restart()?
}
};
Ok(gasket::runtime::WorkSchedule::Unit(next))
}
async fn execute(&mut self, unit: &Self::WorkUnit) -> Result<(), gasket::error::Error> {
self.process_next(unit).await
}
}

View file

@ -1,24 +1,21 @@
#![feature(async_fn_in_trait)]
use std::time::Duration;
use gasket::{
messaging::{
tokio::{InputPort, OutputPort},
RecvPort, SendPort,
},
runtime::{ScheduleResult, WorkSchedule, Worker},
runtime::{WorkSchedule, Worker},
};
use pallas_miniprotocols::Point;
use pallas_upstream::{BlockFetchEvent, Cursor};
use tracing::{error, info};
use pallas_upstream::{Cursor, UpstreamEvent};
use tracing::error;
struct Witness {
input: InputPort<pallas_upstream::BlockFetchEvent>,
input: InputPort<UpstreamEvent>,
}
#[async_trait::async_trait]
impl Worker for Witness {
type WorkUnit = BlockFetchEvent;
type WorkUnit = UpstreamEvent;
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Registry::new()
@ -30,7 +27,7 @@ impl Worker for Witness {
Ok(WorkSchedule::Unit(msg.payload))
}
async fn execute(&mut self, unit: &Self::WorkUnit) -> Result<(), gasket::error::Error> {
async fn execute(&mut self, _: &Self::WorkUnit) -> Result<(), gasket::error::Error> {
error!("witnessing block event");
Ok(())
@ -46,6 +43,7 @@ impl Cursor for StaticCursor {
}
#[test]
#[ignore]
fn test_mainnet_upstream() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
@ -54,34 +52,28 @@ fn test_mainnet_upstream() {
)
.unwrap();
let mut b = pallas_upstream::n2n::Bootstrapper::new(
StaticCursor,
"relays-new.cardano-mainnet.iohk.io:3001".into(),
764824073,
);
let (send, receive) = gasket::messaging::tokio::channel(200);
// let mut f = Faker {
// output: Default::default(),
// };
let mut output_port = OutputPort::default();
output_port.connect(send);
//f.output.connect(send);
let upstream = pallas_upstream::n2n::Worker::new(
"relays-new.cardano-mainnet.iohk.io:3001".into(),
764824073,
StaticCursor,
output_port,
);
b.connect_output(send);
let b = b.spawn().unwrap();
let mut w = Witness {
let mut witness = Witness {
input: Default::default(),
};
w.input.connect(receive);
witness.input.connect(receive);
//let f = gasket::runtime::spawn_stage(f, Default::default(), Some("faker"));
let w = gasket::runtime::spawn_stage(w, Default::default(), Some("witness"));
let upstream = gasket::runtime::spawn_stage(upstream, Default::default(), Some("upstream"));
let witness = gasket::runtime::spawn_stage(witness, Default::default(), Some("witness"));
let d = gasket::daemon::Daemon(vec![w]);
let daemon = gasket::daemon::Daemon(vec![upstream, witness]);
d.block();
daemon.block();
}

View file

@ -11,8 +11,7 @@ readme = "../README.md"
authors = ["Santiago Carmuega <santiago@carmuega.me>"]
[dependencies]
pallas-multiplexer = { version = "0.18.0", path = "../pallas-multiplexer/" }
pallas-miniprotocols = { version = "0.18.0", path = "../pallas-miniprotocols/" }
pallas-network = { version = "0.18.0", path = "../pallas-network/" }
pallas-primitives = { version = "0.18.0", path = "../pallas-primitives/" }
pallas-traverse = { version = "0.18.0", path = "../pallas-traverse/" }
pallas-addresses = { version = "0.18.0", path = "../pallas-addresses/" }

View file

@ -1,10 +0,0 @@
//! Ledger primitives and cbor codecs for different Cardano eras
#[doc(inline)]
pub use pallas_primitives as primitives;
#[doc(inline)]
pub use pallas_traverse as traverse;
#[doc(inline)]
pub use pallas_addresses as addresses;

View file

@ -9,9 +9,21 @@
#![warn(missing_docs)]
#![warn(missing_doc_code_examples)]
pub mod network;
#[doc(inline)]
pub use pallas_network as network;
pub mod ledger;
pub mod ledger {
//! Ledger primitives and cbor codecs for different Cardano eras
#[doc(inline)]
pub use pallas_primitives as primitives;
#[doc(inline)]
pub use pallas_traverse as traverse;
#[doc(inline)]
pub use pallas_addresses as addresses;
}
#[doc(inline)]
pub use pallas_crypto as crypto;

View file

@ -1,10 +0,0 @@
//! Network components of the Ouroboros protocol
#[doc(inline)]
pub use pallas_multiplexer as multiplexer;
#[doc(inline)]
pub use pallas_miniprotocols as miniprotocols;
#[doc(inline)]
pub use pallas_upstream as upstream;