fix: Handle bearer I/O errors (#247)

This commit is contained in:
Santiago Carmuega 2023-04-11 15:08:44 +02:00 committed by GitHub
parent 1dc87174bd
commit 46dd2a685f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 11 deletions

View file

@ -25,7 +25,7 @@ pub enum Error {
}
pub struct PeerClient {
plexer_handle: JoinHandle<tokio::io::Result<()>>,
plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub handshake: handshake::Confirmation<handshake::n2n::VersionData>,
chainsync: chainsync::N2NClient,
blockfetch: blockfetch::Client,
@ -81,7 +81,7 @@ impl PeerClient {
}
pub struct NodeClient {
plexer_handle: JoinHandle<tokio::io::Result<()>>,
plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub handshake: handshake::Confirmation<handshake::n2c::VersionData>,
chainsync: chainsync::N2CClient,
statequery: localstate::ClientV10,

View file

@ -151,7 +151,10 @@ impl SegmentBuffer {
let mut buf = vec![0u8; remaining];
match self.0.try_read(&mut buf) {
Ok(0) => break Err(Error::EmptyBearer),
Ok(0) => {
error!("empty bearer");
break Err(Error::EmptyBearer);
}
Ok(n) => {
trace!(n, "found data on bearer");
self.1.extend_from_slice(&buf[0..n]);
@ -164,8 +167,9 @@ impl SegmentBuffer {
trace!("reading from bearer would block");
continue;
}
Err(e) => {
return Err(Error::BearerIo(e));
Err(err) => {
error!(?err, "beaerer IO error");
break Err(Error::BearerIo(err));
}
}
}
@ -296,10 +300,11 @@ impl Plexer {
}
}
async fn mux(&mut self, msg: (Protocol, Payload)) -> tokio::io::Result<()> {
async fn mux(&mut self, msg: (Protocol, Payload)) -> Result<(), Error> {
self.bearer
.write_segment(msg.0, &self.clock, &msg.1)
.await?;
.await
.map_err(|_| Error::PlexerMux)?;
if tracing::event_enabled!(tracing::Level::TRACE) {
trace!(
@ -312,12 +317,15 @@ impl Plexer {
Ok(())
}
async fn demux(&mut self, protocol: Protocol, payload: Payload) -> tokio::io::Result<()> {
async fn demux(&mut self, protocol: Protocol, payload: Payload) -> Result<(), Error> {
if tracing::event_enabled!(tracing::Level::TRACE) {
trace!(protocol, data = hex::encode(&payload), "read from bearer");
}
self.egress.0.send((protocol, payload)).unwrap();
self.egress
.0
.send((protocol, payload))
.map_err(|err| Error::PlexerDemux(err.0 .0, err.0 .1))?;
Ok(())
}
@ -330,11 +338,12 @@ impl Plexer {
AgentChannel::for_server(protocol, &self.ingress, &self.egress)
}
pub async fn run(&mut self) -> tokio::io::Result<()> {
pub async fn run(&mut self) -> Result<(), Error> {
loop {
trace!("selecting");
select! {
Ok(x) = self.bearer.read_segment() => {
res = self.bearer.read_segment() => {
let x = res?;
trace!("demux selected");
self.demux(x.0, x.1).await?
},