diff --git a/pallas-miniprotocols/src/chainsync/buffer.rs b/pallas-miniprotocols/src/chainsync/buffer.rs new file mode 100644 index 0000000..05fe62b --- /dev/null +++ b/pallas-miniprotocols/src/chainsync/buffer.rs @@ -0,0 +1,193 @@ +use std::collections::{vec_deque::Iter, VecDeque}; + +use crate::Point; + +/// A memory buffer to handle chain rollbacks +/// +/// This structure is intended to facilitate the process of managing rollbacks +/// in a chain sync process. The goal is to keep points in memory until they +/// reach a certain depth (# of confirmations). If a rollback happens, the +/// buffer will try to find the intersection, clear the orphaned points and keep +/// the remaining still in memory. Further forward rolls will accumulate from +/// the intersection. +/// +/// It works by keeping a `VecDeque` data structure of points, where +/// roll-forward operations accumulate at the end of the deque and retrieving +/// confirmed points means to pop from the front of the deque. +/// +/// Notice that it works by keeping track of points, not blocks. It is meant to +/// be used as a lightweight index where blocks can then be retrieved from a +/// more suitable memory structure / persistent storage. +#[derive(Debug)] +pub struct RollbackBuffer { + points: VecDeque, +} + +impl Default for RollbackBuffer { + fn default() -> Self { + Self::new() + } +} + +pub enum RollbackEffect { + Handled, + OutOfScope, +} + +impl RollbackBuffer { + pub fn new() -> Self { + Self { + points: VecDeque::new(), + } + } + + /// Adds a new point to the back of the buffer + pub fn roll_forward(&mut self, point: Point) { + self.points.push_back(point); + } + + /// Retrieves all points above or equal a certain depth + pub fn pop_with_depth(&mut self, min_depth: usize) -> Vec { + match self.points.len().checked_sub(min_depth) { + Some(ready) => self.points.drain(0..ready).collect(), + None => vec![], + } + } + + /// Find the position of a point within the buffer + pub fn position(&self, point: &Point) -> Option { + self.points.iter().position(|p| p.eq(point)) + } + + /// Iterates over the contents of the buffer + pub fn peek(&self) -> Iter { + self.points.iter() + } + + /// Returns the size of the buffer (number of points) + pub fn size(&self) -> usize { + self.points.len() + } + + /// Returns the newest point in the buffer + pub fn latest(&self) -> Option<&Point> { + self.points.back() + } + + /// Returns the oldest point in the buffer + pub fn oldest(&self) -> Option<&Point> { + self.points.front() + } + + /// Unwind the buffer up to a certain point, clearing orphaned items + /// + /// If the buffer contains the rollback point, we can safely discard from + /// the back and return Ok. If the rollback point is outside the scope of + /// the buffer, we clear the whole buffer and notify a failure + /// in the rollback process. + pub fn roll_back(&mut self, point: &Point) -> RollbackEffect { + if let Some(x) = self.position(point) { + self.points.truncate(x + 1); + RollbackEffect::Handled + } else { + self.points.clear(); + RollbackEffect::OutOfScope + } + } +} + +#[cfg(test)] +mod tests { + use crate::{chainsync::RollbackEffect, Point}; + + use super::RollbackBuffer; + + fn dummy_point(i: u64) -> Point { + Point::new(i, i.to_le_bytes().to_vec()) + } + + fn build_filled_buffer(n: usize) -> RollbackBuffer { + let mut buffer = RollbackBuffer::new(); + + for i in 0..n { + let point = dummy_point(i as u64); + buffer.roll_forward(point); + } + + buffer + } + + #[test] + fn roll_forward_accumulates_points() { + let buffer = build_filled_buffer(3); + + assert!(matches!(buffer.position(&dummy_point(0)), Some(0))); + assert!(matches!(buffer.position(&dummy_point(1)), Some(1))); + assert!(matches!(buffer.position(&dummy_point(2)), Some(2))); + + assert_eq!(buffer.oldest().unwrap(), &dummy_point(0)); + assert_eq!(buffer.latest().unwrap(), &dummy_point(2)); + } + + #[test] + fn pop_from_valid_depth_works() { + let mut buffer = build_filled_buffer(5); + + let ready = buffer.pop_with_depth(2); + + assert_eq!(dummy_point(0), ready[0]); + assert_eq!(dummy_point(1), ready[1]); + assert_eq!(dummy_point(2), ready[2]); + + assert_eq!(ready.len(), 3); + + assert_eq!(buffer.oldest().unwrap(), &dummy_point(3)); + assert_eq!(buffer.latest().unwrap(), &dummy_point(4)); + } + + #[test] + fn pop_from_excessive_depth_returns_empty() { + let mut buffer = build_filled_buffer(6); + + let ready = buffer.pop_with_depth(10); + + assert_eq!(ready.len(), 0); + + assert_eq!(buffer.oldest().unwrap(), &dummy_point(0)); + assert_eq!(buffer.latest().unwrap(), &dummy_point(5)); + } + + #[test] + fn roll_back_within_scope_works() { + let mut buffer = build_filled_buffer(6); + + let result = buffer.roll_back(&dummy_point(2)); + + assert!(matches!(result, RollbackEffect::Handled)); + + assert_eq!(buffer.size(), 3); + assert_eq!(buffer.oldest().unwrap(), &dummy_point(0)); + assert_eq!(buffer.latest().unwrap(), &dummy_point(2)); + + let remaining = buffer.pop_with_depth(0); + + assert_eq!(dummy_point(0), remaining[0]); + assert_eq!(dummy_point(1), remaining[1]); + assert_eq!(dummy_point(2), remaining[2]); + + assert_eq!(remaining.len(), 3); + } + + #[test] + fn roll_back_outside_scope_works() { + let mut buffer = build_filled_buffer(6); + + let result = buffer.roll_back(&dummy_point(100)); + + assert!(matches!(result, RollbackEffect::OutOfScope)); + + assert_eq!(buffer.size(), 0); + assert_eq!(buffer.oldest(), None); + assert_eq!(buffer.latest(), None); + } +} diff --git a/pallas-miniprotocols/src/chainsync/mod.rs b/pallas-miniprotocols/src/chainsync/mod.rs index 81be990..6bdafc5 100644 --- a/pallas-miniprotocols/src/chainsync/mod.rs +++ b/pallas-miniprotocols/src/chainsync/mod.rs @@ -1,7 +1,9 @@ +mod buffer; mod clients; mod codec; mod protocol; +pub use buffer::*; pub use clients::*; pub use codec::*; pub use protocol::*; diff --git a/pallas-miniprotocols/src/chainsync/protocol.rs b/pallas-miniprotocols/src/chainsync/protocol.rs index 2f54f71..afe2f11 100644 --- a/pallas-miniprotocols/src/chainsync/protocol.rs +++ b/pallas-miniprotocols/src/chainsync/protocol.rs @@ -2,7 +2,7 @@ use std::{fmt::Debug, ops::Deref}; use crate::common::Point; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Tip(pub Point, pub u64); #[derive(Debug, PartialEq, Clone)] diff --git a/pallas-miniprotocols/src/common.rs b/pallas-miniprotocols/src/common.rs index fd8742b..9ddcaea 100644 --- a/pallas-miniprotocols/src/common.rs +++ b/pallas-miniprotocols/src/common.rs @@ -7,7 +7,7 @@ pub const TESTNET_MAGIC: u64 = 1097911063; pub const MAINNET_MAGIC: u64 = 764824073; /// A point within a chain -#[derive(Clone)] +#[derive(Clone, Eq, PartialEq)] pub struct Point(pub u64, pub Vec); impl Debug for Point { @@ -18,3 +18,9 @@ impl Debug for Point { .finish() } } + +impl Point { + pub fn new(slot: u64, hash: Vec) -> Self { + Point(slot, hash) + } +}