chore: Move miniprotocol examples to custom crate (#97)

This commit is contained in:
Santiago Carmuega 2022-04-28 09:50:33 -03:00 committed by GitHub
parent 2d81664500
commit c998d9adbb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 278 additions and 225 deletions

View file

@ -9,4 +9,6 @@ members = [
"pallas",
"examples/block-download",
"examples/block-decode",
"examples/n2n-miniprotocols",
"examples/n2c-miniprotocols",
]

4
examples/n2c-miniprotocols/.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
/target
scratchpad
.DS_Store

View file

@ -0,0 +1,16 @@
[package]
name = "n2c-miniprotocols"
version = "0.1.0"
edition = "2021"
publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pallas = { path = "../../pallas" }
net2 = "0.2.37"
env_logger = "0.9.0"
hex = "0.4.3"
log = "0.4.16"

View file

@ -0,0 +1,108 @@
use pallas::network::{
miniprotocols::{chainsync, handshake, localstate, run_agent, Point, MAINNET_MAGIC},
multiplexer::Multiplexer,
};
use std::os::unix::net::UnixStream;
#[derive(Debug)]
struct LoggingObserver;
impl chainsync::Observer<chainsync::HeaderContent> for LoggingObserver {
fn on_roll_forward(
&mut self,
_content: chainsync::HeaderContent,
tip: &chainsync::Tip,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll forward, tip at {:?}", tip);
Ok(chainsync::Continuation::Proceed)
}
fn on_intersect_found(
&mut self,
point: &Point,
tip: &chainsync::Tip,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("intersect was found {:?} (tip: {:?})", point, tip);
Ok(chainsync::Continuation::Proceed)
}
fn on_rollback(
&mut self,
point: &Point,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll back {:?}", point);
Ok(chainsync::Continuation::Proceed)
}
fn on_tip_reached(&mut self) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("tip was reached");
Ok(chainsync::Continuation::Proceed)
}
}
fn do_handshake(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(0);
let versions = handshake::n2c::VersionTable::v1_and_above(MAINNET_MAGIC);
let _last = run_agent(handshake::Initiator::initial(versions), &mut channel).unwrap();
}
fn do_localstate_query(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(7);
let agent = run_agent(
localstate::OneShotClient::<localstate::queries::QueryV10>::initial(
None,
localstate::queries::RequestV10::GetChainPoint,
),
&mut channel,
);
log::info!("state query result: {:?}", agent);
}
fn do_chainsync(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(5);
let known_points = vec![Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let agent = run_agent(
chainsync::Consumer::<chainsync::HeaderContent, _>::initial(
Some(known_points),
LoggingObserver {},
),
&mut channel,
);
println!("{:?}", agent);
}
fn main() {
env_logger::builder()
.filter_level(log::LevelFilter::Trace)
.init();
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
let bearer = UnixStream::connect("/tmp/node.socket").unwrap();
// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut muxer = Multiplexer::setup(bearer, &[0, 4, 5]).unwrap();
// execute the required handshake against the relay
do_handshake(&mut muxer);
// execute an arbitrary "Local State" query against the node
do_localstate_query(&mut muxer);
// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(&mut muxer);
}

4
examples/n2n-miniprotocols/.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
/target
scratchpad
.DS_Store

View file

@ -0,0 +1,15 @@
[package]
name = "n2n-miniprotocols"
version = "0.1.0"
edition = "2021"
publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pallas = { path = "../../pallas" }
net2 = "0.2.37"
env_logger = "0.9.0"
hex = "0.4.3"
log = "0.4.16"

View file

@ -0,0 +1,129 @@
use net2::TcpStreamExt;
use pallas::network::{
miniprotocols::{blockfetch, chainsync, handshake, run_agent, Point, MAINNET_MAGIC},
multiplexer::Multiplexer,
};
use std::net::TcpStream;
#[derive(Debug)]
struct LoggingObserver;
impl blockfetch::Observer for LoggingObserver {
fn on_block_received(&mut self, body: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
log::trace!("block received: {}", hex::encode(&body));
Ok(())
}
}
impl chainsync::Observer<chainsync::HeaderContent> for LoggingObserver {
fn on_roll_forward(
&mut self,
_content: chainsync::HeaderContent,
tip: &chainsync::Tip,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll forward, tip at {:?}", tip);
Ok(chainsync::Continuation::Proceed)
}
fn on_intersect_found(
&mut self,
point: &Point,
tip: &chainsync::Tip,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("intersect was found {:?} (tip: {:?})", point, tip);
Ok(chainsync::Continuation::Proceed)
}
fn on_rollback(
&mut self,
point: &Point,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll back {:?}", point);
Ok(chainsync::Continuation::Proceed)
}
fn on_tip_reached(&mut self) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("tip was reached");
Ok(chainsync::Continuation::Proceed)
}
}
fn do_handshake(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(0);
let versions = handshake::n2n::VersionTable::v4_and_above(MAINNET_MAGIC);
let _last = run_agent(handshake::Initiator::initial(versions), &mut channel).unwrap();
}
fn do_blockfetch(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(3);
let range = (
Point::Specific(
43847831,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")
.unwrap(),
),
Point::Specific(
43847844,
hex::decode("ff8d558a3d5a0e058beb3d94d26a567f75cd7d09ff5485aa0d0ebc38b61378d4")
.unwrap(),
),
);
let agent = run_agent(
blockfetch::BatchClient::initial(range, LoggingObserver {}),
&mut channel,
);
println!("{:?}", agent);
}
fn do_chainsync(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(2);
let known_points = vec![Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let agent = run_agent(
chainsync::Consumer::<chainsync::HeaderContent, _>::initial(
Some(known_points),
LoggingObserver {},
),
&mut channel,
);
println!("{:?}", agent);
}
fn main() {
env_logger::builder()
.filter_level(log::LevelFilter::Trace)
.init();
// setup a TCP socket to act as data bearer between our agents and the remote
// relay.
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut muxer = Multiplexer::setup(bearer, &[0, 2, 3, 4]).unwrap();
// execute the required handshake against the relay
do_handshake(&mut muxer);
// fetch an arbitrary batch of block
do_blockfetch(&mut muxer);
// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(&mut muxer);
}

View file

@ -19,7 +19,3 @@ log = "0.4.14"
hex = "0.4.3"
itertools = "0.10.3"
net2 = "0.2.37"
[dev-dependencies]
env_logger = "0.9.0"
pallas-primitives = { version = "0.9.0-alpha.0", path = "../pallas-primitives/" }

View file

@ -1,42 +0,0 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_miniprotocols::blockfetch::{BatchClient, NoopObserver};
use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 3]).unwrap();
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
let range = (
Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")
.unwrap(),
),
Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")
.unwrap(),
),
);
let mut bf_channel = muxer.use_channel(3);
let bf = BatchClient::initial(range, NoopObserver {});
let bf_last = run_agent(bf, &mut bf_channel);
println!("{:?}", bf_last);
}

View file

@ -1,31 +0,0 @@
use pallas_miniprotocols::chainsync::{BlockContent, Consumer, NoopObserver};
use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
use std::os::unix::net::UnixStream;
fn main() {
env_logger::init();
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
let bearer = UnixStream::connect("/tmp/node.socket").unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 4, 5]).unwrap();
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("last hanshake state: {:?}", last);
// some random known-point in the chain to use as starting point for the sync
let known_points = vec![Point::Specific(
45147459,
hex::decode("bee16ef28ac02abb50c340a7deff085a77f3a7b84c66250b3318dcb125c19a10").unwrap(),
)];
let mut cs_channel = muxer.use_channel(5);
let cs = Consumer::<BlockContent, _>::initial(Some(known_points), NoopObserver {});
let cs = run_agent(cs, &mut cs_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -1,40 +0,0 @@
use net2::TcpStreamExt;
use pallas_primitives::alonzo::Header;
use pallas_miniprotocols::Point;
use std::net::TcpStream;
use pallas_miniprotocols::chainsync::{Consumer, HeaderContent, NoopObserver};
use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
#[derive(Debug)]
pub struct Content(u32, Header);
fn main() {
env_logger::init();
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 2]).unwrap();
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
let known_points = vec![Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let mut cs_channel = muxer.use_channel(2);
let cs = Consumer::<HeaderContent, _>::initial(Some(known_points), NoopObserver {});
let cs = run_agent(cs, &mut cs_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -1,24 +0,0 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap();
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
}

View file

@ -1,24 +0,0 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap();
let mut channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Initiator::initial(versions), &mut channel).unwrap();
println!("{:?}", last);
}

View file

@ -1,30 +0,0 @@
use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator};
use pallas_miniprotocols::localstate::{
queries::{QueryV10, RequestV10},
OneShotClient,
};
use pallas_miniprotocols::run_agent;
use pallas_miniprotocols::MAINNET_MAGIC;
use pallas_multiplexer::Multiplexer;
use std::os::unix::net::UnixStream;
fn main() {
env_logger::init();
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
let bearer = UnixStream::connect("/tmp/node.socket").unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 7]).unwrap();
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::only_v10(MAINNET_MAGIC);
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("last hanshake state: {:?}", last);
let mut ls_channel = muxer.use_channel(7);
let cs = OneShotClient::<QueryV10>::initial(None, RequestV10::GetChainPoint);
let cs = run_agent(cs, &mut ls_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -1,30 +0,0 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator};
use pallas_miniprotocols::txsubmission::NaiveProvider;
use pallas_miniprotocols::{run_agent, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 4]).unwrap();
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
let mut ts_channel = muxer.use_channel(4);
let ts = NaiveProvider::initial(vec![]);
let ts = run_agent(ts, &mut ts_channel).unwrap();
println!("{:?}", ts);
}