diff --git a/pallas-multiplexer/src/lib.rs b/pallas-multiplexer/src/lib.rs index 1cb176e..0a77822 100644 --- a/pallas-multiplexer/src/lib.rs +++ b/pallas-multiplexer/src/lib.rs @@ -27,51 +27,62 @@ const MAX_SEGMENT_PAYLOAD_LENGTH: usize = 65535; pub type Payload = Vec; -fn tx_round( +enum TxStepError { + BearerError(std::io::Error), + IngressDisconnected, + IngressEmpty, +} + +fn tx_step( bearer: &mut TBearer, - ingress: &MuxIngress, + ingress_id: u16, + ingress_rx: &mut Receiver, clock: Instant, -) -> Result +) -> Result<(), TxStepError> where TBearer: Bearer, { - let mut writes = 0u16; + match ingress_rx.try_recv() { + Ok(payload) => { + let chunks = payload.chunks(MAX_SEGMENT_PAYLOAD_LENGTH); - for (id, rx) in ingress.iter() { - match rx.try_recv() { - Ok(payload) => { - let chunks = payload.chunks(MAX_SEGMENT_PAYLOAD_LENGTH); + for chunk in chunks { + bearer + .write_segment(clock, ingress_id, chunk) + .map_err(TxStepError::BearerError)?; + } - for chunk in chunks { - bearer.write_segment(clock, *id, chunk)?; - writes += 1; - } - } - Err(TryRecvError::Disconnected) => { - //TODO: remove handle from list - warn!("protocol handle {} disconnected", id); - } - Err(TryRecvError::Empty) => (), - }; + Ok(()) + } + Err(TryRecvError::Disconnected) => Err(TxStepError::IngressDisconnected), + Err(TryRecvError::Empty) => Err(TxStepError::IngressEmpty), } - - Ok(writes) } fn tx_loop(bearer: &mut TBearer, ingress: MuxIngress) where TBearer: Bearer, { + let mut rx_map: HashMap<_, _> = ingress.into_iter().collect(); + loop { let clock = Instant::now(); - match tx_round(bearer, &ingress, clock) { - Err(err) => { + + rx_map.retain(|id, rx| match tx_step(bearer, *id, rx, clock) { + Err(TxStepError::BearerError(err)) => { error!("{:?}", err); panic!(); } - Ok(0) => thread::sleep(Duration::from_millis(10)), - Ok(_) => (), - }; + Err(TxStepError::IngressDisconnected) => { + warn!("protocol handle {} disconnected", id); + false + } + Err(TxStepError::IngressEmpty) => { + thread::sleep(Duration::from_millis(10)); + true + } + Ok(_) => true, + }); } } @@ -111,7 +122,7 @@ pub struct Channel(pub Sender, pub Receiver); type ChannelProtocolHandle = (u16, Channel); type ChannelIngressHandle = (u16, Receiver); type ChannelEgressHandle = (u16, Sender); -type MuxIngress<'a> = &'a [ChannelIngressHandle]; +type MuxIngress = Vec; type DemuxerEgress = Vec; pub struct Multiplexer { @@ -146,7 +157,7 @@ impl Multiplexer { let (ingress, egress): (Vec<_>, Vec<_>) = multiplex_handles.into_iter().unzip(); let mut tx_bearer = bearer.clone(); - let tx_thread = thread::spawn(move || tx_loop(&mut tx_bearer, ingress.as_slice())); + let tx_thread = thread::spawn(move || tx_loop(&mut tx_bearer, ingress)); let mut rx_bearer = bearer.clone(); let rx_thread = thread::spawn(move || rx_loop(&mut rx_bearer, egress));