diff --git a/pallas-rolldb/src/wal/store.rs b/pallas-rolldb/src/wal/store.rs index 7f55c42..1275a74 100644 --- a/pallas-rolldb/src/wal/store.rs +++ b/pallas-rolldb/src/wal/store.rs @@ -82,6 +82,26 @@ impl Log { pub fn is_origin(&self) -> bool { matches!(self, Log::Origin) } + + /// Checks if entry is a forward event (apply or mark) + pub fn is_forward(&self) -> bool { + self.is_mark() || self.is_apply() + } + + /// Checks if entry is a forward event that matches the specified point + pub fn equals_point(&self, point: &(BlockSlot, BlockHash)) -> bool { + if !self.is_forward() { + return false; + } + + self.slot().is_some_and(|x| x == point.0) && self.hash().is_some_and(|x| x.eq(&point.1)) + } + + /// Checks if entry is a forward event that matches any of the specified + /// points + pub fn equals_any_point(&self, points: &[(BlockSlot, BlockHash)]) -> bool { + points.iter().any(|x| self.equals_point(x)) + } } // slot => block hash @@ -303,25 +323,13 @@ impl Store { } } - pub fn find_wal_seq( - &self, - block: Option<(BlockSlot, BlockHash)>, - ) -> Result, Error> { - if block.is_none() { + pub fn find_wal_seq(&self, intersect: &[(BlockSlot, BlockHash)]) -> Result, Error> { + if intersect.is_empty() { return Ok(None); } - let (slot, hash) = block.unwrap(); - - // TODO: Not sure this is 100% accurate: - // i.e Apply(X), Apply(cursor), Undo(cursor), Mark(x) - // We want to start at Apply(cursor) or Mark(cursor), but even then, - // what if we have more than one Apply(cursor), how do we know - // which is correct? let found = WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| { - (v.is_mark() || v.is_apply()) - && v.slot().is_some_and(|s| s == slot) - && v.hash().is_some_and(|h| h.eq(&hash)) + v.equals_any_point(intersect) })?; match found { @@ -330,6 +338,33 @@ impl Store { } } + pub fn crawl_from_intersect( + &self, + options: &[(BlockSlot, BlockHash)], + ) -> Result { + let seq = self.find_wal_seq(options)?; + + // TODO: we need to create a RocksDB snapshot (with `db.snapshot()`) to use as + // the source for sequence scan and the iterator to ensure that sequence + // hasn't been pruned between operations. For the time being we consider + // this is a very narrow edge-case. + + if let Some(seq) = seq { + let seq = Box::<[u8]>::from(DBInt(seq)); + let from = rocksdb::IteratorMode::From(&seq, rocksdb::Direction::Forward); + let mut iter = WalKV::iter_entries(&self.db, from); + + // skip current + iter.next(); + + Ok(WalIterator(iter)) + } else { + let from = rocksdb::IteratorMode::Start; + let iter = WalKV::iter_entries(&self.db, from); + Ok(WalIterator(iter)) + } + } + /// Prune the WAL of entries with slot values over `k_param` from the tip pub fn prune_wal(&self) -> Result<(), Error> { let tip = self.find_tip()?.map(|(slot, _)| slot).unwrap_or_default(); diff --git a/pallas-rolldb/src/wal/stream.rs b/pallas-rolldb/src/wal/stream.rs index 23ef069..69c867d 100644 --- a/pallas-rolldb/src/wal/stream.rs +++ b/pallas-rolldb/src/wal/stream.rs @@ -1,15 +1,18 @@ use futures_core::Stream; -use super::{Log, Seq, Store}; +use super::{BlockHash, BlockSlot, Log, Store}; pub struct RollStream; impl RollStream { - pub fn start_after(store: Store, seq: Option) -> impl Stream { + pub fn intersect( + store: Store, + intersect: Vec<(BlockSlot, BlockHash)>, + ) -> impl Stream { async_stream::stream! { - let mut last_seq = seq; + let mut last_seq = None; - let iter = store.crawl_after(last_seq); + let iter = store.crawl_from_intersect(&intersect).unwrap(); for (seq, val) in iter.flatten() { yield val; @@ -59,7 +62,7 @@ mod tests { } }); - let s = super::RollStream::start_after(db.clone(), None); + let s = super::RollStream::intersect(db.clone(), vec![]); pin_mut!(s);