From 07169cc45ac2f0db6ce0b51685f5900bd8beb26c Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 8 Apr 2022 18:49:31 -0300 Subject: [PATCH] feat(miniprotocols): Allow step-by-step agents (#85) --- pallas-miniprotocols/src/machines.rs | 95 ++++++++++++++++++++++------ 1 file changed, 75 insertions(+), 20 deletions(-) diff --git a/pallas-miniprotocols/src/machines.rs b/pallas-miniprotocols/src/machines.rs index 9fb1dab..11d4a3f 100644 --- a/pallas-miniprotocols/src/machines.rs +++ b/pallas-miniprotocols/src/machines.rs @@ -1,6 +1,7 @@ pub use crate::payloads::*; use pallas_codec::{minicbor, Fragment}; use pallas_multiplexer::{Channel, Payload}; +use std::cell::Cell; use std::fmt::{Debug, Display}; use std::sync::mpsc::Sender; @@ -86,36 +87,90 @@ pub trait Agent: Sized { fn apply_inbound(self, msg: Self::Message) -> Transition; } -pub fn run_agent(agent: T, channel: &mut Channel) -> Result> +pub struct Runner where - T: Agent + Debug, + A: Agent, +{ + agent: Cell>, + buffer: Vec, +} + +impl<'a, A> Runner +where + A: Agent, + A::Message: Fragment + Debug, +{ + pub fn new(agent: A) -> Self { + Self { + agent: Cell::new(Some(agent)), + buffer: Vec::new(), + } + } + + pub fn start(&mut self) -> Result<(), Error> { + let prev = self.agent.take().unwrap(); + let next = prev.apply_start()?; + self.agent.set(Some(next)); + Ok(()) + } + + pub fn run_step(&mut self, channel: &mut Channel) -> Result { + let prev = self.agent.take().unwrap(); + let next = run_agent_step(prev, channel, &mut self.buffer)?; + let is_done = next.is_done(); + + self.agent.set(Some(next)); + + Ok(is_done) + } + + pub fn fulfill(mut self, channel: &mut Channel) -> Result<(), Error> { + self.start()?; + + while self.run_step(channel)? {} + + Ok(()) + } +} + +pub fn run_agent_step(agent: T, channel: &mut Channel, buffer: &mut Vec) -> Transition +where + T: Agent, T::Message: Fragment + Debug, { let Channel(tx, rx) = channel; + match agent.has_agency() { + true => { + let msg = agent.build_next(); + log::trace!("processing outbound msg: {:?}", msg); + + let mut payload = Vec::new(); + minicbor::encode(&msg, &mut payload)?; + tx.send(payload)?; + + agent.apply_outbound(msg) + } + false => { + let msg = read_until_full_msg::(buffer, rx).unwrap(); + log::trace!("procesing inbound msg: {:?}", msg); + + agent.apply_inbound(msg) + } + } +} + +pub fn run_agent(agent: T, channel: &mut Channel) -> Result> +where + T: Agent, + T::Message: Fragment + Debug, +{ let mut buffer = Vec::new(); let mut agent = agent.apply_start()?; while !agent.is_done() { - log::debug!("evaluating agent {:?}", agent); - - match agent.has_agency() { - true => { - let msg = agent.build_next(); - - let mut payload = Vec::new(); - minicbor::encode(&msg, &mut payload)?; - tx.send(payload)?; - - agent = agent.apply_outbound(msg)?; - } - false => { - let msg = read_until_full_msg::(&mut buffer, rx).unwrap(); - log::trace!("procesing inbound msg: {:?}", msg); - agent = agent.apply_inbound(msg)?; - } - } + agent = run_agent_step(agent, channel, &mut buffer)?; } Ok(agent)