diff --git a/pallas-multiplexer/examples/listener.rs b/pallas-multiplexer/examples/listener.rs index c985236..c144bf8 100644 --- a/pallas-multiplexer/examples/listener.rs +++ b/pallas-multiplexer/examples/listener.rs @@ -1,4 +1,4 @@ -use std::{net::TcpListener, thread, time::Duration}; +use std::{net::TcpListener, os::unix::net::UnixListener, thread, time::Duration}; use pallas_multiplexer::Multiplexer; @@ -7,7 +7,8 @@ const PROTOCOLS: [u16; 2] = [0x8002u16, 0x8003u16]; fn main() { env_logger::init(); - let server = TcpListener::bind("0.0.0.0:3001").unwrap(); + //let server = TcpListener::bind("0.0.0.0:3001").unwrap(); + let server = UnixListener::bind("/tmp/pallas").unwrap(); let (bearer, _) = server.accept().unwrap(); let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap(); diff --git a/pallas-multiplexer/examples/sender.rs b/pallas-multiplexer/examples/sender.rs index d41516c..c44c4f4 100644 --- a/pallas-multiplexer/examples/sender.rs +++ b/pallas-multiplexer/examples/sender.rs @@ -1,4 +1,4 @@ -use std::{net::TcpStream, thread, time::Duration}; +use std::{net::TcpStream, os::unix::net::UnixStream, thread, time::Duration}; use pallas_multiplexer::Multiplexer; @@ -7,7 +7,8 @@ const PROTOCOLS: [u16; 2] = [0x0002u16, 0x0003u16]; fn main() { env_logger::init(); - let bearer = TcpStream::connect("127.0.0.1:3001").unwrap(); + //let bearer = TcpStream::connect("127.0.0.1:3001").unwrap(); + let bearer = UnixStream::connect("/tmp/pallas").unwrap(); let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap(); for protocol in PROTOCOLS { diff --git a/pallas-multiplexer/src/bearers.rs b/pallas-multiplexer/src/bearers.rs new file mode 100644 index 0000000..423b8ed --- /dev/null +++ b/pallas-multiplexer/src/bearers.rs @@ -0,0 +1,98 @@ +use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt}; +use log::{debug, log_enabled, trace}; +use std::io::{Read, Write}; +use std::os::unix::net::UnixStream; +use std::{net::TcpStream, time::Instant}; + +use crate::{Bearer, Payload}; + +fn write_segment( + writer: &mut impl Write, + clock: Instant, + protocol_id: u16, + payload: &[u8], +) -> Result<(), std::io::Error> { + let mut msg = Vec::new(); + msg.write_u32::(clock.elapsed().as_micros() as u32)?; + msg.write_u16::(protocol_id)?; + msg.write_u16::(payload.len() as u16)?; + + if log_enabled!(log::Level::Trace) { + trace!( + "sending segment, header {:?}, payload length: {}", + hex::encode(&msg), + payload.len() + ); + } + + msg.write(&payload[..]).unwrap(); + + writer.write(&msg)?; + writer.flush() +} + +fn read_segment(reader: &mut impl Read) -> Result<(u16, u32, Payload), std::io::Error> { + let mut header = [0u8; 8]; + + reader.read_exact(&mut header)?; + + if log_enabled!(log::Level::Trace) { + trace!("read segment header: {:?}", hex::encode(&header)); + } + + let length = NetworkEndian::read_u16(&header[6..]) as usize; + let id = NetworkEndian::read_u16(&mut header[4..6]) as usize ^ 0x8000; + let ts = NetworkEndian::read_u32(&mut header[0..4]); + + debug!( + "parsed inbound msg, protocol id: {}, ts: {}, payload length: {}", + id, ts, length + ); + + let mut payload = vec![0u8; length]; + reader.read_exact(&mut payload)?; + + if log_enabled!(log::Level::Trace) { + trace!("read segment payload: {:?}", hex::encode(&payload)); + } + + Ok((id as u16, ts, payload)) +} + +impl Bearer for TcpStream { + fn clone(&self) -> Self { + self.try_clone().unwrap() + } + + fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error> { + read_segment(self) + } + + fn write_segment( + &mut self, + clock: Instant, + protocol_id: u16, + partial_payload: &[u8], + ) -> Result<(), std::io::Error> { + write_segment(self, clock, protocol_id, partial_payload) + } +} + +impl Bearer for UnixStream { + fn clone(&self) -> Self { + self.try_clone().unwrap() + } + + fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error> { + read_segment(self) + } + + fn write_segment( + &mut self, + clock: Instant, + protocol_id: u16, + partial_payload: &[u8], + ) -> Result<(), std::io::Error> { + write_segment(self, clock, protocol_id, partial_payload) + } +} diff --git a/pallas-multiplexer/src/lib.rs b/pallas-multiplexer/src/lib.rs index 3097d2f..bb1f2e7 100644 --- a/pallas-multiplexer/src/lib.rs +++ b/pallas-multiplexer/src/lib.rs @@ -1,15 +1,15 @@ +mod bearers; + use std::{ - borrow::Borrow, collections::HashMap, io::{Read, Write}, - net::TcpStream, sync::mpsc::{self, Receiver, Sender, TryRecvError}, thread::{self, JoinHandle}, time::{Duration, Instant}, }; -use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt}; -use log::{debug, error, log_enabled, trace, warn}; + +use log::{debug, error, warn}; pub trait Bearer: Read + Write + Send + Sync + Sized { fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error>; @@ -24,66 +24,6 @@ pub trait Bearer: Read + Write + Send + Sync + Sized { fn clone(&self) -> Self; } -impl Bearer for TcpStream { - fn write_segment( - &mut self, - clock: Instant, - protocol_id: u16, - payload: &[u8], - ) -> Result<(), std::io::Error> { - let mut msg = Vec::new(); - msg.write_u32::(clock.elapsed().as_micros() as u32)?; - msg.write_u16::(protocol_id)?; - msg.write_u16::(payload.len() as u16)?; - - if log_enabled!(log::Level::Trace) { - trace!( - "sending segment, header {:?}, payload length: {}", - hex::encode(&msg), - payload.len() - ); - } - - msg.write(&payload[..]).unwrap(); - - self.write(&msg)?; - - self.flush() - } - - fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error> { - let mut header = [0u8; 8]; - - self.read_exact(&mut header)?; - - if log_enabled!(log::Level::Trace) { - trace!("read segment header: {:?}", hex::encode(&header)); - } - - let length = NetworkEndian::read_u16(&header[6..]) as usize; - let id = NetworkEndian::read_u16(&mut header[4..6]) as usize ^ 0x8000; - let ts = NetworkEndian::read_u32(&mut header[0..4]); - - debug!( - "parsed inbound msg, protocol id: {}, ts: {}, payload length: {}", - id, ts, length - ); - - let mut payload = vec![0u8; length]; - self.read_exact(&mut payload)?; - - if log_enabled!(log::Level::Trace) { - trace!("read segment payload: {:?}", hex::encode(&payload)); - } - - Ok((id as u16, ts, payload)) - } - - fn clone(&self) -> Self { - self.try_clone().unwrap() - } -} - const MAX_SEGMENT_PAYLOAD_LENGTH: usize = 65535; pub type Payload = Vec;