feat(rolldb): allow crawl from intersect options (#404)

This commit is contained in:
Santiago Carmuega 2024-02-13 21:34:10 -03:00 committed by GitHub
parent 5a44f38e7a
commit ab1fd882d6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 58 additions and 20 deletions

View file

@ -82,6 +82,26 @@ impl Log {
pub fn is_origin(&self) -> bool {
matches!(self, Log::Origin)
}
/// Checks if entry is a forward event (apply or mark)
pub fn is_forward(&self) -> bool {
self.is_mark() || self.is_apply()
}
/// Checks if entry is a forward event that matches the specified point
pub fn equals_point(&self, point: &(BlockSlot, BlockHash)) -> bool {
if !self.is_forward() {
return false;
}
self.slot().is_some_and(|x| x == point.0) && self.hash().is_some_and(|x| x.eq(&point.1))
}
/// Checks if entry is a forward event that matches any of the specified
/// points
pub fn equals_any_point(&self, points: &[(BlockSlot, BlockHash)]) -> bool {
points.iter().any(|x| self.equals_point(x))
}
}
// slot => block hash
@ -303,25 +323,13 @@ impl Store {
}
}
pub fn find_wal_seq(
&self,
block: Option<(BlockSlot, BlockHash)>,
) -> Result<Option<Seq>, Error> {
if block.is_none() {
pub fn find_wal_seq(&self, intersect: &[(BlockSlot, BlockHash)]) -> Result<Option<Seq>, Error> {
if intersect.is_empty() {
return Ok(None);
}
let (slot, hash) = block.unwrap();
// TODO: Not sure this is 100% accurate:
// i.e Apply(X), Apply(cursor), Undo(cursor), Mark(x)
// We want to start at Apply(cursor) or Mark(cursor), but even then,
// what if we have more than one Apply(cursor), how do we know
// which is correct?
let found = WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| {
(v.is_mark() || v.is_apply())
&& v.slot().is_some_and(|s| s == slot)
&& v.hash().is_some_and(|h| h.eq(&hash))
v.equals_any_point(intersect)
})?;
match found {
@ -330,6 +338,33 @@ impl Store {
}
}
pub fn crawl_from_intersect(
&self,
options: &[(BlockSlot, BlockHash)],
) -> Result<WalIterator, Error> {
let seq = self.find_wal_seq(options)?;
// TODO: we need to create a RocksDB snapshot (with `db.snapshot()`) to use as
// the source for sequence scan and the iterator to ensure that sequence
// hasn't been pruned between operations. For the time being we consider
// this is a very narrow edge-case.
if let Some(seq) = seq {
let seq = Box::<[u8]>::from(DBInt(seq));
let from = rocksdb::IteratorMode::From(&seq, rocksdb::Direction::Forward);
let mut iter = WalKV::iter_entries(&self.db, from);
// skip current
iter.next();
Ok(WalIterator(iter))
} else {
let from = rocksdb::IteratorMode::Start;
let iter = WalKV::iter_entries(&self.db, from);
Ok(WalIterator(iter))
}
}
/// Prune the WAL of entries with slot values over `k_param` from the tip
pub fn prune_wal(&self) -> Result<(), Error> {
let tip = self.find_tip()?.map(|(slot, _)| slot).unwrap_or_default();

View file

@ -1,15 +1,18 @@
use futures_core::Stream;
use super::{Log, Seq, Store};
use super::{BlockHash, BlockSlot, Log, Store};
pub struct RollStream;
impl RollStream {
pub fn start_after(store: Store, seq: Option<Seq>) -> impl Stream<Item = Log> {
pub fn intersect(
store: Store,
intersect: Vec<(BlockSlot, BlockHash)>,
) -> impl Stream<Item = Log> {
async_stream::stream! {
let mut last_seq = seq;
let mut last_seq = None;
let iter = store.crawl_after(last_seq);
let iter = store.crawl_from_intersect(&intersect).unwrap();
for (seq, val) in iter.flatten() {
yield val;
@ -59,7 +62,7 @@ mod tests {
}
});
let s = super::RollStream::start_after(db.clone(), None);
let s = super::RollStream::intersect(db.clone(), vec![]);
pin_mut!(s);