feat(miniprotocols): Allow step-by-step agents (#85)
This commit is contained in:
parent
b6452aa82d
commit
07169cc45a
1 changed files with 75 additions and 20 deletions
|
|
@ -1,6 +1,7 @@
|
|||
pub use crate::payloads::*;
|
||||
use pallas_codec::{minicbor, Fragment};
|
||||
use pallas_multiplexer::{Channel, Payload};
|
||||
use std::cell::Cell;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::sync::mpsc::Sender;
|
||||
|
||||
|
|
@ -86,36 +87,90 @@ pub trait Agent: Sized {
|
|||
fn apply_inbound(self, msg: Self::Message) -> Transition<Self>;
|
||||
}
|
||||
|
||||
pub fn run_agent<T>(agent: T, channel: &mut Channel) -> Result<T, Box<dyn std::error::Error>>
|
||||
pub struct Runner<A>
|
||||
where
|
||||
T: Agent + Debug,
|
||||
A: Agent,
|
||||
{
|
||||
agent: Cell<Option<A>>,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<'a, A> Runner<A>
|
||||
where
|
||||
A: Agent,
|
||||
A::Message: Fragment + Debug,
|
||||
{
|
||||
pub fn new(agent: A) -> Self {
|
||||
Self {
|
||||
agent: Cell::new(Some(agent)),
|
||||
buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(&mut self) -> Result<(), Error> {
|
||||
let prev = self.agent.take().unwrap();
|
||||
let next = prev.apply_start()?;
|
||||
self.agent.set(Some(next));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn run_step(&mut self, channel: &mut Channel) -> Result<bool, Error> {
|
||||
let prev = self.agent.take().unwrap();
|
||||
let next = run_agent_step(prev, channel, &mut self.buffer)?;
|
||||
let is_done = next.is_done();
|
||||
|
||||
self.agent.set(Some(next));
|
||||
|
||||
Ok(is_done)
|
||||
}
|
||||
|
||||
pub fn fulfill(mut self, channel: &mut Channel) -> Result<(), Error> {
|
||||
self.start()?;
|
||||
|
||||
while self.run_step(channel)? {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_agent_step<T>(agent: T, channel: &mut Channel, buffer: &mut Vec<u8>) -> Transition<T>
|
||||
where
|
||||
T: Agent,
|
||||
T::Message: Fragment + Debug,
|
||||
{
|
||||
let Channel(tx, rx) = channel;
|
||||
|
||||
match agent.has_agency() {
|
||||
true => {
|
||||
let msg = agent.build_next();
|
||||
log::trace!("processing outbound msg: {:?}", msg);
|
||||
|
||||
let mut payload = Vec::new();
|
||||
minicbor::encode(&msg, &mut payload)?;
|
||||
tx.send(payload)?;
|
||||
|
||||
agent.apply_outbound(msg)
|
||||
}
|
||||
false => {
|
||||
let msg = read_until_full_msg::<T::Message>(buffer, rx).unwrap();
|
||||
log::trace!("procesing inbound msg: {:?}", msg);
|
||||
|
||||
agent.apply_inbound(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_agent<T>(agent: T, channel: &mut Channel) -> Result<T, Box<dyn std::error::Error>>
|
||||
where
|
||||
T: Agent,
|
||||
T::Message: Fragment + Debug,
|
||||
{
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
let mut agent = agent.apply_start()?;
|
||||
|
||||
while !agent.is_done() {
|
||||
log::debug!("evaluating agent {:?}", agent);
|
||||
|
||||
match agent.has_agency() {
|
||||
true => {
|
||||
let msg = agent.build_next();
|
||||
|
||||
let mut payload = Vec::new();
|
||||
minicbor::encode(&msg, &mut payload)?;
|
||||
tx.send(payload)?;
|
||||
|
||||
agent = agent.apply_outbound(msg)?;
|
||||
}
|
||||
false => {
|
||||
let msg = read_until_full_msg::<T::Message>(&mut buffer, rx).unwrap();
|
||||
log::trace!("procesing inbound msg: {:?}", msg);
|
||||
agent = agent.apply_inbound(msg)?;
|
||||
}
|
||||
}
|
||||
agent = run_agent_step(agent, channel, &mut buffer)?;
|
||||
}
|
||||
|
||||
Ok(agent)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue