fix(multiplexer): Remove disconnected protocols from muxer loop (#16)

This commit is contained in:
Santiago Carmuega 2022-01-03 20:49:42 -03:00 committed by GitHub
parent c566769c51
commit 19b9511d5c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -27,51 +27,62 @@ const MAX_SEGMENT_PAYLOAD_LENGTH: usize = 65535;
pub type Payload = Vec<u8>;
fn tx_round<TBearer>(
enum TxStepError {
BearerError(std::io::Error),
IngressDisconnected,
IngressEmpty,
}
fn tx_step<TBearer>(
bearer: &mut TBearer,
ingress: &MuxIngress,
ingress_id: u16,
ingress_rx: &mut Receiver<Payload>,
clock: Instant,
) -> Result<u16, std::io::Error>
) -> Result<(), TxStepError>
where
TBearer: Bearer,
{
let mut writes = 0u16;
match ingress_rx.try_recv() {
Ok(payload) => {
let chunks = payload.chunks(MAX_SEGMENT_PAYLOAD_LENGTH);
for (id, rx) in ingress.iter() {
match rx.try_recv() {
Ok(payload) => {
let chunks = payload.chunks(MAX_SEGMENT_PAYLOAD_LENGTH);
for chunk in chunks {
bearer
.write_segment(clock, ingress_id, chunk)
.map_err(TxStepError::BearerError)?;
}
for chunk in chunks {
bearer.write_segment(clock, *id, chunk)?;
writes += 1;
}
}
Err(TryRecvError::Disconnected) => {
//TODO: remove handle from list
warn!("protocol handle {} disconnected", id);
}
Err(TryRecvError::Empty) => (),
};
Ok(())
}
Err(TryRecvError::Disconnected) => Err(TxStepError::IngressDisconnected),
Err(TryRecvError::Empty) => Err(TxStepError::IngressEmpty),
}
Ok(writes)
}
fn tx_loop<TBearer>(bearer: &mut TBearer, ingress: MuxIngress)
where
TBearer: Bearer,
{
let mut rx_map: HashMap<_, _> = ingress.into_iter().collect();
loop {
let clock = Instant::now();
match tx_round(bearer, &ingress, clock) {
Err(err) => {
rx_map.retain(|id, rx| match tx_step(bearer, *id, rx, clock) {
Err(TxStepError::BearerError(err)) => {
error!("{:?}", err);
panic!();
}
Ok(0) => thread::sleep(Duration::from_millis(10)),
Ok(_) => (),
};
Err(TxStepError::IngressDisconnected) => {
warn!("protocol handle {} disconnected", id);
false
}
Err(TxStepError::IngressEmpty) => {
thread::sleep(Duration::from_millis(10));
true
}
Ok(_) => true,
});
}
}
@ -111,7 +122,7 @@ pub struct Channel(pub Sender<Payload>, pub Receiver<Payload>);
type ChannelProtocolHandle = (u16, Channel);
type ChannelIngressHandle = (u16, Receiver<Payload>);
type ChannelEgressHandle = (u16, Sender<Payload>);
type MuxIngress<'a> = &'a [ChannelIngressHandle];
type MuxIngress = Vec<ChannelIngressHandle>;
type DemuxerEgress = Vec<ChannelEgressHandle>;
pub struct Multiplexer {
@ -146,7 +157,7 @@ impl Multiplexer {
let (ingress, egress): (Vec<_>, Vec<_>) = multiplex_handles.into_iter().unzip();
let mut tx_bearer = bearer.clone();
let tx_thread = thread::spawn(move || tx_loop(&mut tx_bearer, ingress.as_slice()));
let tx_thread = thread::spawn(move || tx_loop(&mut tx_bearer, ingress));
let mut rx_bearer = bearer.clone();
let rx_thread = thread::spawn(move || rx_loop(&mut rx_bearer, egress));