feat: Add client/server use_channel variants (#228)

The goal here is to make the use of channels when interfacing with nodes less of a black box; the 0x8000 bit flip is a very surprising behavior if you're not already familiar with it.
This commit is contained in:
Pi Lanningham 2023-02-09 15:38:45 -05:00 committed by GitHub
parent 4915d14cd5
commit a81fc101fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 46 additions and 14 deletions

View file

@ -13,8 +13,8 @@ fn main() {
let bearer = Bearer::connect_tcp("relays-new.cardano-testnet.iohkdev.io:3001").unwrap();
let mut plexer = StdPlexer::new(bearer);
let handshake = plexer.use_channel(PROTOCOL_N2N_HANDSHAKE);
let blockfetch = plexer.use_channel(PROTOCOL_N2N_BLOCK_FETCH);
let handshake = plexer.use_client_channel(PROTOCOL_N2N_HANDSHAKE);
let blockfetch = plexer.use_client_channel(PROTOCOL_N2N_BLOCK_FETCH);
plexer.muxer.spawn();
plexer.demuxer.spawn();

View file

@ -78,9 +78,9 @@ fn main() {
// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut plexer = multiplexer::StdPlexer::new(bearer);
let handshake = plexer.use_channel(PROTOCOL_N2C_HANDSHAKE);
let statequery = plexer.use_channel(PROTOCOL_N2C_STATE_QUERY);
let chainsync = plexer.use_channel(PROTOCOL_N2C_CHAIN_SYNC);
let handshake = plexer.use_client_channel(PROTOCOL_N2C_HANDSHAKE);
let statequery = plexer.use_client_channel(PROTOCOL_N2C_STATE_QUERY);
let chainsync = plexer.use_client_channel(PROTOCOL_N2C_CHAIN_SYNC);
plexer.muxer.spawn();
plexer.demuxer.spawn();

View file

@ -86,9 +86,9 @@ fn main() {
// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut plexer = StdPlexer::new(bearer);
let handshake = plexer.use_channel(PROTOCOL_N2N_HANDSHAKE);
let blockfetch = plexer.use_channel(PROTOCOL_N2N_BLOCK_FETCH);
let chainsync = plexer.use_channel(PROTOCOL_N2N_CHAIN_SYNC);
let handshake = plexer.use_client_channel(PROTOCOL_N2N_HANDSHAKE);
let blockfetch = plexer.use_client_channel(PROTOCOL_N2N_BLOCK_FETCH);
let chainsync = plexer.use_client_channel(PROTOCOL_N2N_CHAIN_SYNC);
plexer.muxer.spawn();
plexer.demuxer.spawn();

View file

@ -70,8 +70,8 @@ bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
// create a new multiplexer, specifying which mini-protocol IDs we want to sue
let mut muxer = Multiplexer::setup(bearer, &[0]).unwrap();
// get a handle for the handhsake mini-protocol handle
let mut channel = muxer.use_channel(pallas_miniprotocols::PROTOCOL_N2N_HANDSHAKE);
// get a handle for the (client-side) handhsake mini-protocol handle
let mut channel = muxer.use_client_channel(pallas_miniprotocols::PROTOCOL_N2N_HANDSHAKE);
// create a handshake client agent with an initial state
let agent = handshake::Client::initial(VersionTable::v4_and_above(MAINNET_MAGIC));

View file

@ -14,6 +14,22 @@ pub const PREVIEW_MAGIC: u64 = 2;
/// Well-known magic for pre-production
pub const PRE_PRODUCTION_MAGIC: u64 = 1;
/// Bitflag for client-side version of a known protocol
/// # Example
/// ```
/// use pallas_miniprotocols::*;
/// let channel = PROTOCOL_CLIENT | PROTOCOL_N2N_HANDSHAKE;
/// ```
pub const PROTOCOL_CLIENT: u16 = 0x0;
/// Bitflag for server-side version of a known protocol
/// # Example
/// ```
/// use pallas_miniprotocols::*;
/// let channel = PROTOCOL_SERVER | PROTOCOL_N2N_CHAIN_SYNC;
/// ```
pub const PROTOCOL_SERVER: u16 = 0x8000;
/// Protocol channel number for node-to-node handshakes
pub const PROTOCOL_N2N_HANDSHAKE: u16 = 0;

View file

@ -34,7 +34,7 @@ let bearer = UnixStream::connect("/tmp/pallas").unwrap();
let muxer = Multiplexer::setup(tcp, &[0, 2])
// Ask the multiplexer to provide us with the channel for the miniprotocol #0.
let mut handshake = muxer.use_channel(PROTOCOL_N2N_HANDSHAKE);
let mut handshake = muxer.use_client_channel(PROTOCOL_N2N_HANDSHAKE);
// Spawn a thread and pass the ownership of the channel.
thread::spawn(move || {
@ -52,7 +52,7 @@ thread::spawn(move || {
});
// Ask the multiplexer to provide us with the channel for the chainsync miniprotocol.
let mut chainsync = muxer.use_channel(PROTOCOL_N2N_CHAINSYNC);
let mut chainsync = muxer.use_client_channel(PROTOCOL_N2N_CHAINSYNC);
// Spawn a different thread and pass the ownership of the 2nd channel.
thread::spawn(move || {

View file

@ -43,6 +43,8 @@ pub struct StdPlexer {
pub mux_tx: Sender<Message>,
}
const PROTOCOL_SERVER_BIT: u16 = 0x8000;
impl StdPlexer {
pub fn new(bearer: Bearer) -> Self {
let (mux_tx, mux_rx) = channel::<Message>();
@ -62,6 +64,20 @@ impl StdPlexer {
(protocol, mux_tx, demux_rx)
}
/// Use the client-side channel for a given protocol
/// Explicitly unsets the most significant bit, forcing use of the client
/// side channel
pub fn use_client_channel(&mut self, protocol: u16) -> StdChannel {
self.use_channel(protocol & !PROTOCOL_SERVER_BIT)
}
/// Use the server-side channel for a given protocol
/// Explicitly sets the most significant bit, forcing use of the server side
/// channel
pub fn use_server_channel(&mut self, protocol: u16) -> StdChannel {
self.use_channel(protocol | PROTOCOL_SERVER_BIT)
}
}
impl mux::Muxer<StdIngress> {

View file

@ -44,8 +44,8 @@ fn one_way_small_sequence_of_payloads() {
let mut active_plexer = active.join().unwrap();
let mut passive_plexer = passive.join().unwrap();
let mut sender_channel = active_plexer.use_channel(0x0003u16);
let mut receiver_channel = passive_plexer.use_channel(0x8003u16);
let mut sender_channel = active_plexer.use_client_channel(0x0003u16);
let mut receiver_channel = passive_plexer.use_server_channel(0x0003u16);
active_plexer.muxer.spawn();
passive_plexer.demuxer.spawn();