Implement multiplexer and mini-protocols PoC

This commit is contained in:
Santiago Carmuega 2021-11-20 11:33:45 -03:00
parent 57788f9e63
commit 65144ce14b
28 changed files with 1398 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

9
Cargo.toml Normal file
View file

@ -0,0 +1,9 @@
[workspace]
members = [
"pallas-multiplexer",
"pallas-machines",
"pallas-handshake",
"pallas-blockfetch",
"pallas",
]

15
README.md Normal file
View file

@ -0,0 +1,15 @@
# Pallas
Rust-native building blocks for the Cardano blockchain ecosystem.
## Introduction
Pallas is an expanding collection of modules that re-implements common
Cardano logic in native Rust. This crate doesn't provide any particular
application, it is meant to be used as a base layer to facilitate the
development of higher-level use-cases, such as explorers, wallets, etc (who
knows, maybe even a full node in the far away future).
## Etymology
> Pallas: (Greek mythology) goddess of wisdom and useful arts and prudent warfare;

2
pallas-blockfetch/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

View file

@ -0,0 +1,24 @@
[package]
name = "pallas-blockfetch"
version = "0.1.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
license = "Apache-2.0"
authors = [
"Santiago Carmuega <santiago@carmuega.me>"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pallas-multiplexer = { path = "../pallas-multiplexer/" }
pallas-machines = { path = "../pallas-machines/" }
minicbor = { version="0.11.4", features=["half"] }
minicbor-io = "0.6.0"
log = "0.4.14"
[dev-dependencies]
net2 = "0.2.37"
env_logger = "0.9.0"
pallas-handshake = { path = "../pallas-handshake/" }
hex = "0.4.3"

View file

@ -0,0 +1,47 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_blockfetch::{BlockFetchClient, Point};
use pallas_handshake::n2n::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_machines::run_agent;
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer =
TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0, 3]).unwrap();
let (_, rx, tx) = handles.remove(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
println!("{:?}", last);
let range = (
Point(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")
.unwrap(),
),
Point(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")
.unwrap(),
),
);
let (_, bf_rx, bf_tx) = handles.remove(0);
let bf = BlockFetchClient::initial(range);
let bf_last = run_agent(bf, bf_rx, &bf_tx);
println!("{:?}", bf_last);
}

View file

@ -0,0 +1,190 @@
use log::info;
use pallas_machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput,
PayloadDecoder, PayloadEncoder, Transition,
};
#[derive(Clone, Debug)]
pub struct Point(pub u64, pub Vec<u8>);
impl EncodePayload for Point {
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
e.array(2)?.u64(self.0)?.bytes(&self.1)?;
Ok(())
}
}
impl DecodePayload for Point {
fn decode_payload(
d: &mut PayloadDecoder,
) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let slot = d.u64()?;
let hash = d.bytes()?;
Ok(Point(slot, Vec::from(hash)))
}
}
#[derive(Debug, PartialEq, Clone)]
pub enum State {
Idle,
Busy,
Streaming,
Done,
}
#[derive(Debug)]
pub enum Message {
RequestRange { range: (Point, Point) },
ClientDone,
StartBatch,
NoBlocks,
Block { body: Vec<u8> },
BatchDone,
}
impl EncodePayload for Message {
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
match self {
Message::RequestRange { range } => {
e.array(3)?.u16(0)?;
range.0.encode_payload(e)?;
range.1.encode_payload(e)?;
Ok(())
}
Message::ClientDone => {
e.array(1)?.u16(1)?;
Ok(())
}
Message::StartBatch => {
e.array(1)?.u16(2)?;
Ok(())
}
Message::NoBlocks => {
e.array(1)?.u16(3)?;
Ok(())
}
Message::Block { body } => {
e.array(2)?.u16(4)?;
e.bytes(&body)?;
Ok(())
}
Message::BatchDone => {
e.array(1)?.u16(5)?;
Ok(())
}
}
}
}
impl DecodePayload for Message {
fn decode_payload(
d: &mut PayloadDecoder,
) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let label = d.u16()?;
match label {
0 => {
let point1 = Point::decode_payload(d)?;
let point2 = Point::decode_payload(d)?;
Ok(Message::RequestRange {
range: (point1, point2),
})
}
1 => Ok(Message::ClientDone),
2 => Ok(Message::StartBatch),
3 => Ok(Message::NoBlocks),
4 => {
d.tag()?;
let body = d.bytes()?;
Ok(Message::Block {
body: Vec::from(body),
})
}
5 => Ok(Message::BatchDone),
x => Err(Box::new(MachineError::BadLabel(x))),
}
}
}
#[derive(Debug)]
pub struct BlockFetchClient {
pub state: State,
pub range: (Point, Point),
}
impl BlockFetchClient {
pub fn initial(range: (Point, Point)) -> Self {
Self {
state: State::Idle,
range,
}
}
fn send_request_range(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::RequestRange {
range: self.range.clone(),
};
tx.send_msg(&msg)?;
Ok(Self {
state: State::Busy,
..self
})
}
}
impl Agent for BlockFetchClient {
type Message = Message;
fn is_done(&self) -> bool {
self.state == State::Done
}
fn has_agency(&self) -> bool {
match self.state {
State::Idle => true,
State::Busy => false,
State::Streaming => false,
State::Done => false,
}
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.state {
State::Idle => self.send_request_range(tx),
_ => panic!("I don't have agency, don't know what to do"),
}
}
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Busy, Message::StartBatch) => Ok(Self {
state: State::Streaming,
..self
}),
(State::Busy, Message::NoBlocks) => Ok(Self {
state: State::Done,
..self
}),
(State::Streaming, Message::Block { body }) => {
info!("received block body of size {}", body.len());
Ok(self)
}
(State::Streaming, Message::BatchDone) => Ok(Self {
state: State::Done,
..self
}),
_ => panic!("I have agency, I don't expect messages"),
}
}
}

2
pallas-handshake/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

View file

@ -0,0 +1,23 @@
[package]
name = "pallas-handshake"
version = "0.1.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
license = "Apache-2.0"
authors = [
"Santiago Carmuega <santiago@carmuega.me>"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pallas-multiplexer = { path = "../pallas-multiplexer/" }
pallas-machines = { path = "../pallas-machines/" }
minicbor = { version="0.11.4", features=["half"] }
minicbor-io = "0.6.0"
itertools = "0.10.1"
log = "0.4.14"
[dev-dependencies]
net2 = "0.2.37"
env_logger = "0.9.0"

View file

@ -0,0 +1,26 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_handshake::n2c::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_machines::run_agent;
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer =
TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0]).unwrap();
let (_, rx, tx) = handles.remove(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
println!("{:?}", last);
}

View file

@ -0,0 +1,26 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_handshake::n2n::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_machines::run_agent;
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer =
TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0]).unwrap();
let (_, rx, tx) = handles.remove(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
println!("{:?}", last);
}

View file

@ -0,0 +1,80 @@
use itertools::Itertools;
use pallas_machines::{DecodePayload, EncodePayload, PayloadEncoder};
use std::{collections::HashMap, fmt::Debug};
pub const TESTNET_MAGIC: u64 = 1097911063;
pub const MAINNET_MAGIC: u64 = 764824073;
#[derive(Debug, Clone)]
pub struct VersionTable<T>
where
T: Debug + Clone + EncodePayload + DecodePayload,
{
pub values: HashMap<u64, T>,
}
impl<T> EncodePayload for VersionTable<T>
where
T: Debug + Clone + EncodePayload + DecodePayload,
{
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
e.map(self.values.len() as u64)?;
for key in self.values.keys().sorted() {
e.u64(*key)?;
self.values[key].encode_payload(e)?;
}
Ok(())
}
}
pub type NetworkMagic = u64;
pub type VersionNumber = u64;
#[derive(Debug)]
pub enum RefuseReason {
VersionMismatch(Vec<VersionNumber>),
HandshakeDecodeError(VersionNumber, String),
Refused(VersionNumber, String),
}
impl EncodePayload for RefuseReason {
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
match self {
RefuseReason::VersionMismatch(versions) => {
e.array(2)?;
e.u16(0)?;
e.array(versions.len() as u64)?;
for v in versions.iter() {
e.u64(*v)?;
}
Ok(())
}
RefuseReason::HandshakeDecodeError(version, msg) => {
e.array(3)?;
e.u16(1)?;
e.u64(*version)?;
e.str(msg)?;
Ok(())
}
RefuseReason::Refused(version, msg) => {
e.array(3)?;
e.u16(1)?;
e.u64(*version)?;
e.str(msg)?;
Ok(())
}
}
}
}

View file

@ -0,0 +1,6 @@
mod common;
pub mod n2c;
pub mod n2n;
pub use common::{MAINNET_MAGIC, TESTNET_MAGIC};

203
pallas-handshake/src/n2c.rs Normal file
View file

@ -0,0 +1,203 @@
use core::panic;
use std::collections::HashMap;
use pallas_machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput,
PayloadDecoder, PayloadEncoder,
};
use crate::common::{NetworkMagic, RefuseReason, VersionNumber};
pub type VersionTable = crate::common::VersionTable<VersionData>;
const PROTOCOL_V1: u64 = 1;
const PROTOCOL_V2: u64 = 32770;
const PROTOCOL_V3: u64 = 32771;
const PROTOCOL_V4: u64 = 32772;
const PROTOCOL_V5: u64 = 32773;
const PROTOCOL_V6: u64 = 32774;
const PROTOCOL_V7: u64 = 32775;
const PROTOCOL_V8: u64 = 32776;
const PROTOCOL_V9: u64 = 32777;
// const PROTOCOL_V10: u64 = 32778;
impl VersionTable {
pub fn v1_and_above(network_magic: u64) -> VersionTable {
let values = vec![
(PROTOCOL_V1, VersionData(network_magic)),
(PROTOCOL_V2, VersionData(network_magic)),
(PROTOCOL_V3, VersionData(network_magic)),
(PROTOCOL_V4, VersionData(network_magic)),
(PROTOCOL_V5, VersionData(network_magic)),
(PROTOCOL_V6, VersionData(network_magic)),
(PROTOCOL_V7, VersionData(network_magic)),
(PROTOCOL_V8, VersionData(network_magic)),
(PROTOCOL_V9, VersionData(network_magic)),
]
.into_iter()
.collect::<HashMap<u64, VersionData>>();
VersionTable { values }
}
}
#[derive(Debug, Clone)]
pub struct VersionData (NetworkMagic,);
impl EncodePayload for VersionData {
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
e.u64(self.0)?;
Ok(())
}
}
impl DecodePayload for VersionData {
fn decode_payload(
d: &mut PayloadDecoder,
) -> Result<Self, Box<dyn std::error::Error>> {
let network_magic = d.u64()?;
Ok(Self(network_magic))
}
}
#[derive(Debug)]
pub enum Message {
Propose(VersionTable),
Accept(VersionNumber, VersionData),
Refuse(RefuseReason),
}
impl EncodePayload for Message {
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
match self {
Message::Propose(version_table) => {
e.array(2)?.u16(0)?;
version_table.encode_payload(e)?;
}
Message::Accept(version_number, version_data) => {
e.array(3)?.u16(1)?;
e.u64(*version_number)?;
version_data.encode_payload(e)?;
}
Message::Refuse(reason) => {
e.array(2)?.u16(2)?;
reason.encode_payload(e)?;
}
};
Ok(())
}
}
impl DecodePayload for Message {
fn decode_payload(
d: &mut PayloadDecoder,
) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let msg = match d.u16()? {
0 => todo!(),
1 => {
let version_number = d.u64()?;
let version_data = VersionData::decode_payload(d)?;
Message::Accept(version_number, version_data)
}
2 => todo!(),
x => return Err(Box::new(MachineError::BadLabel(x))),
};
Ok(msg)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum State {
Propose,
Confirm,
Done,
}
#[derive(Debug)]
pub enum Output {
Pending,
Accepted(VersionNumber, VersionData),
Refused(RefuseReason),
}
#[derive(Debug)]
pub struct Client {
state: State,
output: Output,
version_table: VersionTable,
}
impl Client {
pub fn initial(version_table: VersionTable) -> Self {
Client {
state: State::Propose,
output: Output::Pending,
version_table,
}
}
}
impl Agent for Client {
type Message = Message;
fn is_done(&self) -> bool {
self.state == State::Done
}
fn has_agency(&self) -> bool {
match self.state {
State::Propose => true,
State::Confirm => false,
State::Done => false,
}
}
fn send_next(
self,
tx: &impl MachineOutput,
) -> Result<Self, Box<dyn std::error::Error>> {
match self.state {
State::Propose => {
tx.send_msg(&Message::Propose(self.version_table.clone()))?;
Ok(Self {
state: State::Confirm,
..self
})
}
_ => panic!("I don't have agency, nothing to send"),
}
}
fn receive_next(
self,
msg: Self::Message,
) -> Result<Self, Box<dyn std::error::Error>> {
match (self.state, msg) {
(State::Confirm, Message::Accept(version, data)) => Ok(Self {
state: State::Done,
output: Output::Accepted(version, data),
..self
}),
(State::Confirm, Message::Refuse(reason)) => Ok(Self {
state: State::Done,
output: Output::Refused(reason),
..self
}),
_ => panic!("Current state does't expect to receive a message"),
}
}
}

214
pallas-handshake/src/n2n.rs Normal file
View file

@ -0,0 +1,214 @@
use core::panic;
use std::collections::HashMap;
use pallas_machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput,
PayloadDecoder, PayloadEncoder,
};
use crate::common::{RefuseReason, VersionNumber};
pub type VersionTable = crate::common::VersionTable<VersionData>;
const PROTOCOL_V4: u64 = 4;
const PROTOCOL_V5: u64 = 5;
const PROTOCOL_V6: u64 = 6;
const PROTOCOL_V7: u64 = 7;
impl VersionTable {
pub fn v4_and_above(network_magic: u64) -> VersionTable {
let values = vec![
(PROTOCOL_V4, VersionData::new(network_magic, false)),
(PROTOCOL_V5, VersionData::new(network_magic, false)),
(PROTOCOL_V6, VersionData::new(network_magic, false)),
(PROTOCOL_V7, VersionData::new(network_magic, false)),
]
.into_iter()
.collect::<HashMap<u64, VersionData>>();
VersionTable { values }
}
}
#[derive(Debug, Clone)]
pub struct VersionData {
network_magic: u64,
initiator_and_responder_diffusion_mode: bool,
}
impl VersionData {
pub fn new(
network_magic: u64,
initiator_and_responder_diffusion_mode: bool,
) -> Self {
VersionData {
network_magic,
initiator_and_responder_diffusion_mode,
}
}
}
impl EncodePayload for VersionData {
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
e.array(2)?
.u64(self.network_magic)?
.bool(self.initiator_and_responder_diffusion_mode)?;
Ok(())
}
}
impl DecodePayload for VersionData {
fn decode_payload(
d: &mut PayloadDecoder,
) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let network_magic = d.u64()?;
let initiator_and_responder_diffusion_mode = d.bool()?;
Ok(Self {
network_magic,
initiator_and_responder_diffusion_mode,
})
}
}
#[derive(Debug)]
pub enum Message {
Propose(VersionTable),
Accept(VersionNumber, VersionData),
Refuse(RefuseReason),
}
impl EncodePayload for Message {
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
match self {
Message::Propose(version_table) => {
e.array(2)?.u16(0)?;
version_table.encode_payload(e)?;
}
Message::Accept(version_number, version_data) => {
e.array(3)?.u16(1)?;
e.u64(*version_number)?;
version_data.encode_payload(e)?;
}
Message::Refuse(reason) => {
e.array(2)?.u16(2)?;
reason.encode_payload(e)?;
}
};
Ok(())
}
}
impl DecodePayload for Message {
fn decode_payload(
d: &mut PayloadDecoder,
) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let msg = match d.u16()? {
0 => todo!(),
1 => {
let version_number = d.u64()?;
let version_data = VersionData::decode_payload(d)?;
Message::Accept(version_number, version_data)
}
2 => todo!(),
x => return Err(Box::new(MachineError::BadLabel(x))),
};
Ok(msg)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum State {
Propose,
Confirm,
Done,
}
#[derive(Debug)]
pub enum Output {
Pending,
Accepted(VersionNumber, VersionData),
Refused(RefuseReason),
}
#[derive(Debug)]
pub struct Client {
state: State,
output: Output,
version_table: VersionTable,
}
impl Client {
pub fn initial(version_table: VersionTable) -> Self {
Client {
state: State::Propose,
output: Output::Pending,
version_table,
}
}
}
impl Agent for Client {
type Message = Message;
fn is_done(&self) -> bool {
self.state == State::Done
}
fn has_agency(&self) -> bool {
match self.state {
State::Propose => true,
State::Confirm => false,
State::Done => false,
}
}
fn send_next(
self,
tx: &impl MachineOutput,
) -> Result<Self, Box<dyn std::error::Error>> {
match self.state {
State::Propose => {
tx.send_msg(&Message::Propose(self.version_table.clone()))?;
Ok(Self {
state: State::Confirm,
..self
})
}
_ => panic!("I don't have agency, nothing to send"),
}
}
fn receive_next(
self,
msg: Self::Message,
) -> Result<Self, Box<dyn std::error::Error>> {
match (self.state, msg) {
(State::Confirm, Message::Accept(version, data)) => Ok(Self {
state: State::Done,
output: Output::Accepted(version, data),
..self
}),
(State::Confirm, Message::Refuse(reason)) => Ok(Self {
state: State::Done,
output: Output::Refused(reason),
..self
}),
_ => panic!("Current state does't expect to receive a message"),
}
}
}

2
pallas-machines/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

View file

@ -0,0 +1,16 @@
[package]
name = "pallas-machines"
version = "0.1.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
license = "Apache-2.0"
authors = [
"Santiago Carmuega <santiago@carmuega.me>"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pallas-multiplexer = { path = "../pallas-multiplexer/" }
minicbor = { version="0.11.4", features=["half"] }
log = "0.4.14"

174
pallas-machines/src/lib.rs Normal file
View file

@ -0,0 +1,174 @@
use log::{debug, trace, warn};
use minicbor::{Decoder, Encoder};
use pallas_multiplexer::Payload;
use std::borrow::Borrow;
use std::fmt::{Debug, Display};
use std::sync::mpsc::{Receiver, Sender};
#[derive(Debug)]
pub enum MachineError {
BadLabel(u16),
}
impl Display for MachineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MachineError::BadLabel(label) => {
write!(f, "unknown message label [{}]", label)
}
}
}
}
impl std::error::Error for MachineError {}
pub type PayloadEncoder<'a> = Encoder<&'a mut Vec<u8>>;
pub type PayloadDecoder<'a> = Decoder<'a>;
pub trait EncodePayload {
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>>;
}
pub fn to_payload(
data: &dyn EncodePayload,
) -> Result<Payload, Box<dyn std::error::Error>> {
let mut payload = Vec::new();
let mut encoder = minicbor::encode::Encoder::new(&mut payload);
data.encode_payload(&mut encoder)?;
Ok(payload)
}
pub struct Message<D>
where
D: EncodePayload,
{
pub label: u32,
pub data: D,
}
impl<D> EncodePayload for Message<D>
where
D: EncodePayload,
{
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
// TODO: map concrete error to W::Error somehow?
// or just implement custom error struct
let data = to_payload(&self.data).unwrap();
e.array(2)?.u32(self.label)?.bytes(&data)?;
Ok(())
}
}
pub trait MachineOutput {
fn send_msg(
&self,
data: &impl EncodePayload,
) -> Result<(), Box<dyn std::error::Error>>;
}
impl MachineOutput for Sender<Payload> {
fn send_msg(
&self,
data: &impl EncodePayload,
) -> Result<(), Box<dyn std::error::Error>> {
let payload = to_payload(data.borrow())?;
self.send(payload)?;
Ok(())
}
}
pub trait DecodePayload: Sized {
fn decode_payload(
d: &mut PayloadDecoder,
) -> Result<Self, Box<dyn std::error::Error>>;
}
pub struct PayloadDeconstructor {
rx: Receiver<Payload>,
remaining: Vec<u8>,
}
impl PayloadDeconstructor {
pub fn consume_next_message<T: DecodePayload>(
&mut self,
) -> Result<T, Box<dyn std::error::Error>> {
if self.remaining.len() == 0 {
debug!("no remaining payload, fetching next segment");
let payload = self.rx.recv()?;
self.remaining.extend(payload);
}
let mut decoder = minicbor::Decoder::new(&self.remaining);
match T::decode_payload(&mut decoder) {
Ok(t) => {
let new_pos = decoder.position();
self.remaining.drain(0..new_pos);
debug!("consumed {} from payload buffer", new_pos);
Ok(t)
}
Err(err) => {
//TODO: we need to filter this only for correct errors
warn!("{:?}", err);
debug!("payload incomplete, fetching next segment");
let payload = self.rx.recv()?;
self.remaining.extend(payload);
self.consume_next_message::<T>()
}
}
}
}
pub type Transition<T> = Result<T, Box<dyn std::error::Error>>;
pub trait Agent: Sized {
type Message: DecodePayload + Debug;
fn is_done(&self) -> bool;
fn has_agency(&self) -> bool;
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self>;
fn receive_next(self, msg: Self::Message) -> Transition<Self>;
}
pub fn run_agent<T: Agent + Debug>(
agent: T,
rx: Receiver<Payload>,
output: &impl MachineOutput,
) -> Result<T, Box<dyn std::error::Error>> {
let mut input = PayloadDeconstructor {
rx,
remaining: Vec::new(),
};
let mut agent = agent;
while !agent.is_done() {
debug!("evaluating agent {:?}", agent);
match agent.has_agency() {
true => {
agent = agent.send_next(output)?;
}
false => {
let msg = input.consume_next_message::<T::Message>()?;
trace!("procesing inbound msg: {:?}", msg);
agent = agent.receive_next(msg)?;
}
}
}
Ok(agent)
}

2
pallas-multiplexer/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

View file

@ -0,0 +1,19 @@
[package]
name = "pallas-multiplexer"
version = "0.1.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
license = "Apache-2.0"
authors = [
"Santiago Carmuega <santiago@carmuega.me>"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "0.4.14"
byteorder = "1.4.3"
hex = "0.4.3"
[dev-dependencies]
env_logger = "0.9.0"

View file

@ -0,0 +1,27 @@
use std::{net::TcpListener, thread, time::Duration};
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
let server = TcpListener::bind("0.0.0.0:3001").unwrap();
let (bearer, _) = server.accept().unwrap();
let handles =
Multiplexer::new(bearer, &vec![0x8002u16, 0x8003u16][..]).unwrap();
for handle in handles {
thread::spawn(move || {
let (id, rx, tx) = handle;
loop {
let payload = rx.recv().unwrap();
println!("id:{}, length:{}", id, payload.len());
}
});
}
loop {
thread::sleep(Duration::from_secs(6000));
}
}

View file

@ -0,0 +1,29 @@
use std::{net::TcpStream, thread, time::Duration};
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
let bearer = TcpStream::connect("127.0.0.1:3001").unwrap();
let handles =
Multiplexer::new(bearer, &vec![0x0002u16, 0x0003u16][..]).unwrap();
for (idx, handle) in handles.into_iter().enumerate() {
thread::spawn(move || {
let (id, rx, tx) = handle;
loop {
let payload = vec![1; 65545];
tx.send(payload).unwrap();
thread::sleep(Duration::from_millis(
50u64 + (idx as u64 * 10u64),
));
}
});
}
loop {
thread::sleep(Duration::from_secs(6000));
}
}

View file

@ -0,0 +1,215 @@
use std::{
collections::HashMap,
io::{Read, Write},
net::TcpStream,
sync::mpsc::{self, Receiver, Sender, TryRecvError},
thread,
time::{Duration, Instant},
};
use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt};
use log::{debug, error, log_enabled, trace, warn};
pub trait Bearer: Read + Write + Send + Sync + Sized {
fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error>;
fn write_segment(
&mut self,
clock: Instant,
protocol_id: u16,
partial_payload: &[u8],
) -> Result<(), std::io::Error>;
fn clone(&self) -> Self;
}
impl Bearer for TcpStream {
fn write_segment(
&mut self,
clock: Instant,
protocol_id: u16,
payload: &[u8],
) -> Result<(), std::io::Error> {
let mut msg = Vec::new();
msg.write_u32::<NetworkEndian>(clock.elapsed().as_micros() as u32)?;
msg.write_u16::<NetworkEndian>(protocol_id)?;
msg.write_u16::<NetworkEndian>(payload.len() as u16)?;
if log_enabled!(log::Level::Trace) {
trace!(
"sending segment, header {:?}, payload length: {}",
hex::encode(&msg),
payload.len()
);
}
msg.write(&payload[..]).unwrap();
self.write(&msg)?;
self.flush()
}
fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error> {
let mut header = [0u8; 8];
self.read_exact(&mut header)?;
let length = NetworkEndian::read_u16(&header[6..]) as usize;
let mut payload = vec![0u8; length];
self.read_exact(&mut payload)?;
let id = NetworkEndian::read_u16(&mut header[4..6]) as usize ^ 0x8000;
let ts = NetworkEndian::read_u32(&mut header[0..4]);
if log_enabled!(log::Level::Trace) {
trace!(
"received segment, header: {:?}, payload length: {}",
hex::encode(&header),
payload.len()
);
}
Ok((id as u16, ts, payload))
}
fn clone(&self) -> Self {
self.try_clone().unwrap()
}
}
const MAX_SEGMENT_PAYLOAD_LENGTH: usize = 65535;
pub type Payload = Vec<u8>;
#[derive(Debug)]
pub struct Error {}
fn tx_round<TBearer>(
bearer: &mut TBearer,
ingress: &MuxIngress,
clock: Instant,
) -> Result<u16, std::io::Error>
where
TBearer: Bearer,
{
let mut writes = 0u16;
for (id, rx) in ingress.iter() {
match rx.try_recv() {
Ok(payload) => {
let chunks = payload.chunks(MAX_SEGMENT_PAYLOAD_LENGTH);
for chunk in chunks {
bearer.write_segment(clock, *id, chunk)?;
writes += 1;
}
}
Err(TryRecvError::Disconnected) => {
//TODO: remove handle from list
warn!("protocol handle disconnected");
}
Err(TryRecvError::Empty) => (),
};
}
Ok(writes)
}
fn tx_loop<TBearer>(bearer: &mut TBearer, ingress: MuxIngress)
where
TBearer: Bearer,
{
loop {
let clock = Instant::now();
match tx_round(bearer, &ingress, clock) {
Err(err) => {
error!("{:?}", err);
panic!();
}
Ok(0) => thread::sleep(Duration::from_millis(10)),
Ok(_) => (),
};
}
}
fn rx_loop<TBearer>(bearer: &mut TBearer, egress: DemuxerEgress)
where
TBearer: Bearer,
{
let mut tx_map: HashMap<_, _> = egress.into_iter().collect();
loop {
match bearer.read_segment() {
Err(err) => {
error!("{:?}", err);
panic!();
}
Ok(segment) => {
let (id, _ts, payload) = segment;
match tx_map.get(&id) {
Some(tx) => match tx.send(payload) {
Err(err) => {
error!("error sending egress tx to protocol, removing protocol from egress output. {:?}", err);
tx_map.remove(&id);
}
Ok(_) => {
debug!("successful tx to egress protocol");
}
},
None => warn!(
"received segment for protocol id not being demuxed {}",
id
),
}
}
}
}
}
type ChannelProtocolHandle = (u16, Receiver<Payload>, Sender<Payload>);
type ChannelIngressHandle = (u16, Receiver<Payload>);
type ChannelEgressHandle = (u16, Sender<Payload>);
type MuxIngress = Vec<ChannelIngressHandle>;
type DemuxerEgress = Vec<ChannelEgressHandle>;
pub struct Multiplexer {}
impl Multiplexer {
pub fn new<TBearer>(
bearer: TBearer,
protocols: &[u16],
) -> Result<Vec<ChannelProtocolHandle>, Error>
where
TBearer: Bearer + 'static,
{
let handles = protocols
.iter()
.map(|id| {
let (demux_tx, demux_rx) = mpsc::channel::<Payload>();
let (mux_tx, mux_rx) = mpsc::channel::<Payload>();
let protocol_handle: ChannelProtocolHandle =
(*id, demux_rx, mux_tx);
let ingress_handle: ChannelIngressHandle = (*id, mux_rx);
let egress_handle: ChannelEgressHandle = (*id, demux_tx);
(protocol_handle, (ingress_handle, egress_handle))
})
.collect::<Vec<_>>();
let (protocol_handles, multiplex_handles): (Vec<_>, Vec<_>) =
handles.into_iter().unzip();
let (ingress, egress): (Vec<_>, Vec<_>) =
multiplex_handles.into_iter().unzip();
let mut tx_bearer = bearer.clone();
thread::spawn(move || tx_loop(&mut tx_bearer, ingress));
let mut rx_bearer = bearer.clone();
thread::spawn(move || rx_loop(&mut rx_bearer, egress));
Ok(protocol_handles)
}
}

2
pallas/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

17
pallas/Cargo.toml Normal file
View file

@ -0,0 +1,17 @@
[package]
name = "pallas"
version = "0.1.0"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
license = "Apache-2.0"
authors = [
"Santiago Carmuega <santiago@carmuega.me>"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pallas-multiplexer = { path = "../pallas-multiplexer/" }
pallas-machines = { path = "../pallas-machines/" }
pallas-handshake = { path = "../pallas-handshake/" }
pallas-blockfetch = { path = "../pallas-blockfetch/" }

12
pallas/src/lib.rs Normal file
View file

@ -0,0 +1,12 @@
//! Rust-native building blocks for the Cardano blockchain ecosystem
//!
//! Pallas is an expanding collection of modules that re-implements common
//! Cardano logic in native Rust. This crate doesn't provide any particular
//! application, it is meant to be used as a base layer to facilitate the
//! development of higher-level use-cases, such as explorers, wallets, etc (who
//! knows, maybe even a full node in the far away future).
#![warn(missing_docs)]
#![warn(missing_doc_code_examples)]
pub mod ouroboros;

12
pallas/src/ouroboros.rs Normal file
View file

@ -0,0 +1,12 @@
#[doc(inline)]
pub use pallas_multiplexer as multiplexer;
#[doc(inline)]
pub use pallas_machines as machines;
#[doc(inline)]
pub use pallas_handshake as handshake;
#[doc(inline)]
pub use pallas_blockfetch as blockfetch;

2
rustfmt.toml Normal file
View file

@ -0,0 +1,2 @@
edition = "2021"
wrap_comments = true