diff --git a/Cargo.toml b/Cargo.toml index 03b7c37..c54da96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "pallas-crypto", "pallas-configs", "pallas-primitives", + "pallas-rolldb", "pallas-traverse", "pallas-utxorpc", "pallas", diff --git a/pallas-rolldb/Cargo.toml b/pallas-rolldb/Cargo.toml new file mode 100644 index 0000000..cb552a2 --- /dev/null +++ b/pallas-rolldb/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "pallas-rolldb" +description = "An opinionated Cardano storage engine built on top of RocksDB" +version = "0.19.1" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +homepage = "https://github.com/txpipe/pallas" +documentation = "https://docs.rs/pallas-rolldb" +license = "Apache-2.0" +readme = "README.md" +authors = ["Santiago Carmuega "] + +[dependencies] +rocksdb = { version = "0.21.0", default-features = false, features = ["multi-threaded-cf"] } +bincode = "1.3.3" +serde = "1.0.188" +thiserror = "1.0.49" +pallas-crypto = { version = "=0.19.1", path = "../pallas-crypto" } +tracing = "0.1.37" +tokio = { version = "1.32.0", features = ["sync", "rt", "time", "macros"] } +async-stream = "0.3.5" +futures-core = "0.3.28" +futures-util = "0.3.28" + +[dev-dependencies] +tempfile = "3.3.0" diff --git a/pallas-rolldb/README.md b/pallas-rolldb/README.md new file mode 100644 index 0000000..d79c5b2 --- /dev/null +++ b/pallas-rolldb/README.md @@ -0,0 +1,4 @@ +# Pallas RollDB + +An opinionated Cardano storage engine built on top of RocksDB. + diff --git a/pallas-rolldb/src/chain/mod.rs b/pallas-rolldb/src/chain/mod.rs new file mode 100644 index 0000000..e997203 --- /dev/null +++ b/pallas-rolldb/src/chain/mod.rs @@ -0,0 +1,12 @@ +use pallas_crypto::hash::Hash; + +mod store; + +#[cfg(test)] +mod tests; + +pub type BlockSlot = u64; +pub type BlockHash = Hash<32>; +pub type BlockBody = Vec; + +pub use store::*; diff --git a/pallas-rolldb/src/chain/store.rs b/pallas-rolldb/src/chain/store.rs new file mode 100644 index 0000000..3988334 --- /dev/null +++ b/pallas-rolldb/src/chain/store.rs @@ -0,0 +1,271 @@ +use pallas_crypto::hash::Hash; +use std::{path::Path, sync::Arc}; +use tracing::warn; + +use rocksdb::{Options, WriteBatch, DB}; + +use super::{BlockBody, BlockHash, BlockSlot}; + +use crate::kvtable::*; + +#[derive(Clone)] +pub struct Store { + db: Arc, + pub tip_change: Arc, +} + +pub struct BlockByHashKV; + +// hash -> block cbor +impl KVTable for BlockByHashKV { + const CF_NAME: &'static str = "BlockByHashKV"; +} + +// slot => block hash +pub struct HashBySlotKV; + +impl KVTable for HashBySlotKV { + const CF_NAME: &'static str = "HashBySlotKV"; +} + +pub struct ChainIterator<'a>(pub EntryIterator<'a, DBInt, DBHash>); + +impl Iterator for ChainIterator<'_> { + type Item = Result<(u64, Hash<32>), Error>; + + fn next(&mut self) -> Option { + self.0.next().map(|v| v.map(|(seq, val)| (seq.0, val.0))) + } +} + +impl Store { + pub fn open(path: impl AsRef) -> Result { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let db = DB::open_cf(&opts, path, [BlockByHashKV::CF_NAME, HashBySlotKV::CF_NAME]) + .map_err(|_| Error::IO)?; + + let out = Self { + db: Arc::new(db), + tip_change: Arc::new(tokio::sync::Notify::new()), + }; + + Ok(out) + } + + pub fn get_block(&self, hash: Hash<32>) -> Result, Error> { + let dbval = BlockByHashKV::get_by_key(&self.db, DBHash(hash))?; + Ok(dbval.map(|x| x.0)) + } + + pub fn roll_forward( + &mut self, + slot: BlockSlot, + hash: BlockHash, + body: BlockBody, + ) -> Result<(), Error> { + let mut batch = WriteBatch::default(); + + // keep track of the new block body + BlockByHashKV::stage_upsert(&self.db, DBHash(hash), DBBytes(body), &mut batch); + + // add new block to HashBySlotKV + HashBySlotKV::stage_upsert(&self.db, DBInt(slot), DBHash(hash), &mut batch); + + self.db.write(batch).map_err(|_| Error::IO)?; + self.tip_change.notify_waiters(); + + Ok(()) + } + + pub fn roll_back(&mut self, until: BlockSlot) -> Result<(), Error> { + let mut batch = WriteBatch::default(); + + // remove rollback-ed blocks from HashBySlotKV + let to_remove = HashBySlotKV::iter_keys_from(&self.db, DBInt(until)).skip(1); + + for key in to_remove { + HashBySlotKV::stage_delete(&self.db, key?, &mut batch); + } + + self.db.write(batch).map_err(|_| Error::IO)?; + self.tip_change.notify_waiters(); + + Ok(()) + } + + pub fn roll_back_origin(&mut self) -> Result<(), Error> { + HashBySlotKV::reset(&self.db)?; + BlockByHashKV::reset(&self.db)?; + + self.tip_change.notify_waiters(); + + Ok(()) + } + + pub fn find_tip(&self) -> Result, Error> { + let mut iter = HashBySlotKV::iter_entries(&self.db, rocksdb::IteratorMode::End); + + if let Some(last) = iter.next() { + let (slot, hash) = last?; + Ok(Some((slot.0, hash.0))) + } else { + Ok(None) + } + } + + pub fn intersect_options( + &self, + max_items: usize, + ) -> Result, Error> { + let mut iter = HashBySlotKV::iter_entries(&self.db, rocksdb::IteratorMode::End) + .filter_map(|res| res.ok()) + .map(|(k, v)| (k.0, v.0)); + + let mut out = Vec::with_capacity(max_items); + + while let Some((slot, hash)) = iter.next() { + out.push((slot, hash)); + + if out.len() >= max_items { + break; + } + + // skip exponentially + let skip = 2usize.pow(out.len() as u32) - 1; + for _ in 0..skip { + iter.next(); + } + } + + Ok(out) + } + + pub fn crawl_after(&self, slot: Option) -> ChainIterator { + if let Some(slot) = slot { + let slot = Box::<[u8]>::from(DBInt(slot)); + let from = rocksdb::IteratorMode::From(&slot, rocksdb::Direction::Forward); + let mut iter = HashBySlotKV::iter_entries(&self.db, from); + + // skip current + iter.next(); + + ChainIterator(iter) + } else { + let from = rocksdb::IteratorMode::Start; + let iter = HashBySlotKV::iter_entries(&self.db, from); + ChainIterator(iter) + } + } + + pub fn crawl(&self) -> ChainIterator { + self.crawl_after(None) + } + + pub fn read_chain_page( + &self, + from: BlockSlot, + len: usize, + ) -> impl Iterator> + '_ { + HashBySlotKV::iter_entries_from(&self.db, DBInt(from)) + .map(|res| res.map(|(x, y)| (x.0, y.0))) + .take(len) + } + + /// Iterator over chain between two points (inclusive) + /// + /// To use Origin as start point set `from` to None. + /// + /// Returns None if either point in range don't exist or `to` point is + /// earlier in chain than `from`. + pub fn read_chain_range( + &self, + from: Option<(BlockSlot, BlockHash)>, + to: (BlockSlot, BlockHash), + ) -> Result> + '_>, Error> + { + // TODO: We want to use a snapshot here to avoid race condition where + // point is checked to be in the HashBySlotKV but it is rolled-back before we + // create the iterator. Problem is `HashBySlotKV` etc must take `DB`, not + // `Snapshot`, so maybe we need a new way of creating something like + // a "KVTableSnapshot" in addition to the current "KVTable" type, which + // has methods on snapshots, but here I was having issues as there is + // no `cf` method on Snapshot but it is used is KVTable. + + // let snapshot = self.db.snapshot(); + + // check p2 not before p1 + let p1_slot = if let Some((slot, _)) = from { + if to.0 < slot { + warn!("chain range end slot before start slot"); + return Ok(None); + } else { + slot + } + } else { + 0 // Use 0 as slot for Origin + }; + + // check p1 exists in HashBySlotKV if provided + if let Some((slot, hash)) = from { + match HashBySlotKV::get_by_key(&self.db, DBInt(slot))? { + Some(DBHash(found_hash)) => { + if hash != found_hash { + warn!("chain range start hash mismatch"); + return Ok(None); + } + } + None => { + warn!("chain range start slot not found"); + return Ok(None); + } + } + } + + // check p2 exists in HashBySlotKV + match HashBySlotKV::get_by_key(&self.db, DBInt(to.0))? { + Some(DBHash(found_hash)) => { + if to.1 != found_hash { + warn!("chain range end hash mismatch"); + return Ok(None); + } + } + None => { + warn!("chain range end slot not found"); + return Ok(None); + } + }; + + // return iterator between p1 and p2 inclusive + Ok(Some( + HashBySlotKV::iter_entries_from(&self.db, DBInt(p1_slot)) + .map(|res| res.map(|(x, y)| (x.0, y.0))) + .take_while(move |x| { + if let Ok((slot, _)) = x { + // iter returns None once point is after `to` slot + *slot <= to.0 + } else { + false + } + }), + )) + } + + /// Check if a point (pair of slot and block hash) exists in the + /// HashBySlotKV + pub fn chain_contains(&self, slot: BlockSlot, hash: &BlockHash) -> Result { + if let Some(DBHash(found)) = HashBySlotKV::get_by_key(&self.db, DBInt(slot))? { + if found == *hash { + return Ok(true); + } + } + + Ok(false) + } + + pub fn destroy(path: impl AsRef) -> Result<(), Error> { + DB::destroy(&Options::default(), path).map_err(|_| Error::IO) + } +} diff --git a/pallas-rolldb/src/chain/tests.rs b/pallas-rolldb/src/chain/tests.rs new file mode 100644 index 0000000..28d4cb8 --- /dev/null +++ b/pallas-rolldb/src/chain/tests.rs @@ -0,0 +1,104 @@ +use super::{BlockBody, BlockHash, BlockSlot, Store}; + +fn with_tmp_db(op: fn(db: Store) -> T) { + let path = tempfile::tempdir().unwrap().into_path(); + let db = Store::open(path.clone()).unwrap(); + + op(db); + + Store::destroy(path).unwrap(); +} + +fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) { + let hash = pallas_crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice()); + (slot, hash, slot.to_be_bytes().to_vec()) +} + +#[test] +fn test_roll_forward_blackbox() { + with_tmp_db(|mut db| { + let (slot, hash, body) = dummy_block(11); + db.roll_forward(slot, hash, body.clone()).unwrap(); + + // ensure block body is persisted + let persisted = db.get_block(hash).unwrap().unwrap(); + assert_eq!(persisted, body); + + // ensure tip matches + let (tip_slot, tip_hash) = db.find_tip().unwrap().unwrap(); + assert_eq!(tip_slot, slot); + assert_eq!(tip_hash, hash); + + // ensure chain has item + let (chain_slot, chain_hash) = db.crawl().next().unwrap().unwrap(); + assert_eq!(chain_slot, slot); + assert_eq!(chain_hash, hash); + }); +} + +#[test] +fn test_roll_back_blackbox() { + with_tmp_db(|mut db| { + for i in 0..=5 { + let (slot, hash, body) = dummy_block(i * 10); + db.roll_forward(slot, hash, body).unwrap(); + } + + db.roll_back(20).unwrap(); + + // ensure tip show rollback point + let (tip_slot, _) = db.find_tip().unwrap().unwrap(); + assert_eq!(tip_slot, 20); + + // ensure chain has items not rolled back + let mut chain = db.crawl(); + + for i in 0..=2 { + let (slot, _) = chain.next().unwrap().unwrap(); + assert_eq!(slot, i * 10); + } + + // ensure chain stops here + assert!(chain.next().is_none()); + }); +} + +//TODO: test rollback beyond K +//TODO: test rollback with unknown slot + +#[test] +fn test_chain_page() { + with_tmp_db(|mut db| { + for i in 0..100 { + let (slot, hash, body) = dummy_block(i * 10); + db.roll_forward(slot, hash, body).unwrap(); + } + + let mut chain = db.read_chain_page(200, 15); + + for i in 0..15 { + let (slot, _) = chain.next().unwrap().unwrap(); + assert_eq!(200 + (i * 10), slot) + } + + assert!(chain.next().is_none()); + }); +} + +#[test] +fn test_intersect_options() { + with_tmp_db(|mut db| { + for i in 0..200 { + let (slot, hash, body) = dummy_block(i * 10); + db.roll_forward(slot, hash, body).unwrap(); + } + + let intersect = db.intersect_options(10).unwrap(); + + let expected = vec![1990, 1970, 1930, 1850, 1690, 1370, 730]; + + for (out, exp) in intersect.iter().zip(expected) { + assert_eq!(out.0, exp); + } + }); +} diff --git a/pallas-rolldb/src/kvtable.rs b/pallas-rolldb/src/kvtable.rs new file mode 100644 index 0000000..36c8d64 --- /dev/null +++ b/pallas-rolldb/src/kvtable.rs @@ -0,0 +1,416 @@ +use pallas_crypto::hash::Hash; +use serde::{de::DeserializeOwned, Serialize}; +use std::marker::PhantomData; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("IO error")] + IO, + + #[error("serde error")] + Serde, + + #[error("not found")] + NotFound, +} + +pub struct DBHash(pub Hash<32>); + +impl From> for DBHash { + fn from(value: Box<[u8]>) -> Self { + let inner: [u8; 32] = value[0..32].try_into().unwrap(); + let inner = Hash::<32>::from(inner); + Self(inner) + } +} + +impl From for Box<[u8]> { + fn from(value: DBHash) -> Self { + let b = value.0.to_vec(); + b.into() + } +} + +impl From> for DBHash { + fn from(value: Hash<32>) -> Self { + DBHash(value) + } +} + +impl From for Hash<32> { + fn from(value: DBHash) -> Self { + value.0 + } +} + +pub struct DBInt(pub u64); + +impl From for Box<[u8]> { + fn from(value: DBInt) -> Self { + let b = value.0.to_be_bytes(); + Box::new(b) + } +} + +impl From> for DBInt { + fn from(value: Box<[u8]>) -> Self { + let inner: [u8; 8] = value[0..8].try_into().unwrap(); + let inner = u64::from_be_bytes(inner); + Self(inner) + } +} + +impl From for DBInt { + fn from(value: u64) -> Self { + DBInt(value) + } +} + +impl From for u64 { + fn from(value: DBInt) -> Self { + value.0 + } +} + +pub struct DBBytes(pub Vec); + +impl From for Box<[u8]> { + fn from(value: DBBytes) -> Self { + value.0.into() + } +} + +impl From> for DBBytes { + fn from(value: Box<[u8]>) -> Self { + Self(value.into()) + } +} + +impl From> for DBBytes +where + V: Serialize, +{ + fn from(value: DBSerde) -> Self { + let inner = bincode::serialize(&value.0).unwrap(); + DBBytes(inner) + } +} + +#[derive(Debug)] +pub struct DBSerde(pub V); + +impl std::ops::Deref for DBSerde { + type Target = V; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From> for Box<[u8]> +where + V: Serialize, +{ + fn from(v: DBSerde) -> Self { + bincode::serialize(&v.0) + .map(|x| x.into_boxed_slice()) + .unwrap() + } +} + +impl From> for DBSerde +where + V: DeserializeOwned, +{ + fn from(value: Box<[u8]>) -> Self { + let inner = bincode::deserialize(&value).unwrap(); + DBSerde(inner) + } +} + +impl From for DBSerde +where + V: DeserializeOwned, +{ + fn from(value: DBBytes) -> Self { + let inner = bincode::deserialize(&value.0).unwrap(); + DBSerde(inner) + } +} + +impl Clone for DBSerde +where + V: Clone, +{ + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +pub struct WithDBIntPrefix(pub u64, pub T); + +impl From> for Box<[u8]> +where + Box<[u8]>: From, +{ + fn from(value: WithDBIntPrefix) -> Self { + let prefix: Box<[u8]> = DBInt(value.0).into(); + let after: Box<[u8]> = value.1.into(); + + [prefix, after].concat().into() + } +} + +impl From> for WithDBIntPrefix { + fn from(_value: Box<[u8]>) -> Self { + todo!() + } +} + +type RocksIterator<'a> = rocksdb::DBIteratorWithThreadMode<'a, rocksdb::DB>; + +pub struct ValueIterator<'a, V>(RocksIterator<'a>, PhantomData); + +impl<'a, V> ValueIterator<'a, V> { + pub fn new(inner: RocksIterator<'a>) -> Self { + Self(inner, Default::default()) + } +} + +impl<'a, V> Iterator for ValueIterator<'a, V> +where + V: From>, +{ + type Item = Result; + + fn next(&mut self) -> Option> { + match self.0.next() { + Some(Ok((_, value))) => Some(Ok(V::from(value))), + Some(Err(err)) => { + tracing::error!(?err); + Some(Err(Error::IO)) + } + None => None, + } + } +} + +pub struct KeyIterator<'a, K>(RocksIterator<'a>, PhantomData); + +impl<'a, K> KeyIterator<'a, K> { + pub fn new(inner: RocksIterator<'a>) -> Self { + Self(inner, Default::default()) + } +} + +impl<'a, K> Iterator for KeyIterator<'a, K> +where + K: From>, +{ + type Item = Result; + + fn next(&mut self) -> Option> { + match self.0.next() { + Some(Ok((key, _))) => Some(Ok(K::from(key))), + Some(Err(err)) => { + tracing::error!(?err); + Some(Err(Error::IO)) + } + None => None, + } + } +} + +pub struct EntryIterator<'a, K, V>(RocksIterator<'a>, PhantomData<(K, V)>); + +impl<'a, K, V> EntryIterator<'a, K, V> { + pub fn new(inner: RocksIterator<'a>) -> Self { + Self(inner, Default::default()) + } +} + +impl<'a, K, V> Iterator for EntryIterator<'a, K, V> +where + K: From>, + V: From>, +{ + type Item = Result<(K, V), Error>; + + fn next(&mut self) -> Option> { + match self.0.next() { + Some(Ok((key, value))) => { + let key_out = K::from(key); + let value_out = V::from(value); + + Some(Ok((key_out, value_out))) + } + Some(Err(err)) => { + tracing::error!(?err); + Some(Err(Error::IO)) + } + None => None, + } + } +} + +pub trait KVTable +where + Box<[u8]>: From, + Box<[u8]>: From, + K: From>, + V: From>, +{ + const CF_NAME: &'static str; + + fn cf(db: &rocksdb::DB) -> rocksdb::ColumnFamilyRef { + db.cf_handle(Self::CF_NAME).unwrap() + } + + fn reset(db: &rocksdb::DB) -> Result<(), Error> { + db.drop_cf(Self::CF_NAME).map_err(|_| Error::IO)?; + + db.create_cf(Self::CF_NAME, &rocksdb::Options::default()) + .map_err(|_| Error::IO)?; + + Ok(()) + } + + fn get_by_key(db: &rocksdb::DB, k: K) -> Result, Error> { + let cf = Self::cf(db); + let raw_key = Box::<[u8]>::from(k); + let raw_value = db + .get_cf(&cf, raw_key) + .map_err(|_| Error::IO)? + .map(|x| Box::from(x.as_slice())); + + match raw_value { + Some(x) => { + let out = ::from(x); + Ok(Some(out)) + } + None => Ok(None), + } + } + + fn stage_upsert(db: &rocksdb::DB, k: K, v: V, batch: &mut rocksdb::WriteBatch) { + let cf = Self::cf(db); + + let k_raw = Box::<[u8]>::from(k); + let v_raw = Box::<[u8]>::from(v); + + batch.put_cf(&cf, k_raw, v_raw); + } + + fn is_empty(db: &rocksdb::DB) -> bool { + // HACK: can't find an easy way to size the num of keys, so we'll start an + // iterator and see if we have at least one value. If someone know a better way + // to accomplish this, please refactor. + let mut iter = Self::iter_keys(db, rocksdb::IteratorMode::Start); + iter.next().is_none() + } + + fn iter_keys<'a>(db: &'a rocksdb::DB, mode: rocksdb::IteratorMode) -> KeyIterator<'a, K> { + let cf = Self::cf(db); + let inner = db.iterator_cf(&cf, mode); + KeyIterator::new(inner) + } + + fn iter_keys_start(db: &rocksdb::DB) -> KeyIterator<'_, K> { + Self::iter_keys(db, rocksdb::IteratorMode::Start) + } + + fn iter_keys_from(db: &rocksdb::DB, from: K) -> KeyIterator<'_, K> { + let from_raw = Box::<[u8]>::from(from); + let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward); + + Self::iter_keys(db, mode) + } + + fn iter_values<'a>(db: &'a rocksdb::DB, mode: rocksdb::IteratorMode) -> ValueIterator<'a, V> { + let cf = Self::cf(db); + let inner = db.iterator_cf(&cf, mode); + ValueIterator::new(inner) + } + + fn iter_values_start(db: &rocksdb::DB) -> ValueIterator<'_, V> { + Self::iter_values(db, rocksdb::IteratorMode::Start) + } + + fn iter_values_from(db: &rocksdb::DB, from: K) -> ValueIterator<'_, V> { + let from_raw = Box::<[u8]>::from(from); + let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward); + + Self::iter_values(db, mode) + } + + fn iter_entries<'a>( + db: &'a rocksdb::DB, + mode: rocksdb::IteratorMode, + ) -> EntryIterator<'a, K, V> { + let cf = Self::cf(db); + let inner = db.iterator_cf(&cf, mode); + EntryIterator::new(inner) + } + + fn iter_entries_start(db: &rocksdb::DB) -> EntryIterator<'_, K, V> { + Self::iter_entries(db, rocksdb::IteratorMode::Start) + } + + fn iter_entries_from(db: &rocksdb::DB, from: K) -> EntryIterator<'_, K, V> { + let from_raw = Box::<[u8]>::from(from); + let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward); + + Self::iter_entries(db, mode) + } + + fn last_key(db: &rocksdb::DB) -> Result, Error> { + let mut iter = Self::iter_keys(db, rocksdb::IteratorMode::End); + + match iter.next() { + None => Ok(None), + Some(x) => Ok(Some(x?)), + } + } + + fn last_value(db: &rocksdb::DB) -> Result, Error> { + let mut iter = Self::iter_values(db, rocksdb::IteratorMode::End); + + match iter.next() { + None => Ok(None), + Some(x) => Ok(Some(x?)), + } + } + + fn last_entry(db: &rocksdb::DB) -> Result, Error> { + let mut iter = Self::iter_entries(db, rocksdb::IteratorMode::End); + + match iter.next() { + None => Ok(None), + Some(x) => Ok(Some(x?)), + } + } + + fn scan_until( + db: &rocksdb::DB, + mode: rocksdb::IteratorMode, + predicate: F, + ) -> Result, Error> + where + F: Fn(&V) -> bool, + { + for (k, v) in Self::iter_entries(db, mode).flatten() { + if predicate(&v) { + return Ok(Some(k)); + } + } + + Ok(None) + } + + fn stage_delete(db: &rocksdb::DB, key: K, batch: &mut rocksdb::WriteBatch) { + let cf = Self::cf(db); + let k_raw = Box::<[u8]>::from(key); + batch.delete_cf(&cf, k_raw); + } +} diff --git a/pallas-rolldb/src/lib.rs b/pallas-rolldb/src/lib.rs new file mode 100644 index 0000000..4659378 --- /dev/null +++ b/pallas-rolldb/src/lib.rs @@ -0,0 +1,3 @@ +pub mod chain; +mod kvtable; +pub mod wal; diff --git a/pallas-rolldb/src/wal/mod.rs b/pallas-rolldb/src/wal/mod.rs new file mode 100644 index 0000000..124cd2a --- /dev/null +++ b/pallas-rolldb/src/wal/mod.rs @@ -0,0 +1,15 @@ +use pallas_crypto::hash::Hash; + +mod store; +mod stream; + +#[cfg(test)] +mod tests; + +pub type Seq = u64; +pub type BlockSlot = u64; +pub type BlockHash = Hash<32>; +pub type BlockBody = Vec; + +pub use store::*; +pub use stream::*; diff --git a/pallas-rolldb/src/wal/store.rs b/pallas-rolldb/src/wal/store.rs new file mode 100644 index 0000000..3903224 --- /dev/null +++ b/pallas-rolldb/src/wal/store.rs @@ -0,0 +1,358 @@ +use rocksdb::Options; +use rocksdb::{IteratorMode, WriteBatch, DB}; +use serde::{Deserialize, Serialize}; +use std::{path::Path, sync::Arc}; + +use crate::kvtable::*; + +use super::{BlockBody, BlockHash, BlockSlot, Seq}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Log { + Apply(BlockSlot, BlockHash, BlockBody), + Undo(BlockSlot, BlockHash, BlockBody), + Mark(BlockSlot, BlockHash, BlockBody), + Origin, +} + +impl Log { + pub fn into_apply( + slot: impl Into, + hash: impl Into, + block: impl Into, + ) -> Self { + Self::Apply(slot.into(), hash.into(), block.into()) + } + + pub fn slot(&self) -> Option { + match self { + Log::Apply(s, _, _) => Some(*s), + Log::Undo(s, _, _) => Some(*s), + Log::Mark(s, _, _) => Some(*s), + Log::Origin => None, + } + } + + pub fn hash(&self) -> Option<&BlockHash> { + match self { + Log::Apply(_, h, _) => Some(h), + Log::Undo(_, h, _) => Some(h), + Log::Mark(_, h, _) => Some(h), + Log::Origin => None, + } + } + + pub fn body(&self) -> Option<&BlockBody> { + match self { + Log::Apply(_, _, b) => Some(b), + Log::Undo(_, _, b) => Some(b), + Log::Mark(_, _, b) => Some(b), + Log::Origin => None, + } + } + + pub fn into_undo(self) -> Option { + match self { + Self::Apply(s, h, b) => Some(Self::Undo(s, h, b)), + _ => None, + } + } + + pub fn into_mark(self) -> Option { + match self { + Log::Apply(s, h, b) => Some(Log::Mark(s, h, b)), + Log::Mark(s, h, b) => Some(Log::Mark(s, h, b)), + Log::Origin => Some(Log::Origin), + Log::Undo(..) => None, + } + } + + pub fn is_apply(&self) -> bool { + matches!(self, Log::Apply(..)) + } + + pub fn is_mark(&self) -> bool { + matches!(self, Log::Mark(..)) + } + + pub fn is_undo(&self) -> bool { + matches!(self, Log::Undo(..)) + } + + pub fn is_origin(&self) -> bool { + matches!(self, Log::Origin) + } +} + +// slot => block hash +pub struct WalKV; + +impl KVTable> for WalKV { + const CF_NAME: &'static str = "WalKV"; +} + +pub struct WalIterator<'a>(pub EntryIterator<'a, DBInt, DBSerde>); + +impl Iterator for WalIterator<'_> { + type Item = Result<(u64, Log), Error>; + + fn next(&mut self) -> Option { + self.0.next().map(|v| v.map(|(seq, val)| (seq.0, val.0))) + } +} + +impl WalKV { + pub fn initialize(db: &DB) -> Result { + if Self::is_empty(db) { + Self::write_seed(db)?; + Ok(0) + } else { + let last = Self::last_key(db)?.map(|x| x.0); + Ok(last.unwrap()) + } + } + + fn write_seed(db: &DB) -> Result<(), Error> { + let mut batch = WriteBatch::default(); + let k = DBInt(0); + let v = DBSerde(Log::Origin); + Self::stage_upsert(db, k, v, &mut batch); + + db.write(batch).map_err(|_| Error::IO) + } +} + +pub struct RollBatch<'a>(&'a DB, WriteBatch, Seq); + +impl<'a> RollBatch<'a> { + fn new(db: &'a DB, last_seq: Seq) -> Self { + Self(db, Default::default(), last_seq) + } + + fn stage_append(&mut self, log: Log) { + let new_seq = self.2 + 1; + WalKV::stage_upsert(&self.0, DBInt(new_seq), DBSerde(log), &mut self.1); + self.2 = new_seq; + } + + fn apply(self) -> Result { + self.0.write(self.1).map_err(|_| Error::IO)?; + Ok(self.2) + } +} + +#[derive(Clone)] +pub struct Store { + db: Arc, + pub tip_change: Arc, + wal_seq: u64, + k_param: u64, +} + +impl Store { + pub fn open(path: impl AsRef, k_param: u64) -> Result { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let db = DB::open_cf(&opts, path, [WalKV::CF_NAME]).map_err(|_| Error::IO)?; + + let wal_seq = WalKV::initialize(&db)?; + + let out = Self { + db: Arc::new(db), + tip_change: Arc::new(tokio::sync::Notify::new()), + wal_seq, + k_param, + }; + + Ok(out) + } + + pub fn roll_forward( + &mut self, + slot: BlockSlot, + hash: BlockHash, + body: BlockBody, + ) -> Result<(), Error> { + let mut batch = RollBatch::new(&mut self.db, self.wal_seq); + + batch.stage_append(Log::Apply(slot, hash, body)); + + self.wal_seq = batch.apply()?; + self.tip_change.notify_waiters(); + + Ok(()) + } + + pub fn roll_back(&mut self, until: BlockSlot) -> Result<(), Error> { + let mut batch = RollBatch::new(&self.db, self.wal_seq); + + let iter = WalKV::iter_values(&self.db, IteratorMode::End); + + for step in iter { + let value = step.map_err(|_| Error::IO)?.0; + + if value.slot().unwrap_or(0) <= until { + batch.stage_append(value.into_mark().unwrap()); + break; + } + + match value.into_undo() { + Some(undo) => { + batch.stage_append(undo); + } + None => continue, + }; + } + + self.wal_seq = batch.apply()?; + self.tip_change.notify_waiters(); + + Ok(()) + } + + pub fn roll_back_origin(&mut self) -> Result<(), Error> { + let mut batch = RollBatch::new(&self.db, self.wal_seq); + + let iter = WalKV::iter_values(&self.db, IteratorMode::End); + + for step in iter { + let value = step.map_err(|_| Error::IO)?.0; + + if value.is_origin() { + break; + } + + match value.into_undo() { + Some(undo) => { + batch.stage_append(undo); + } + None => continue, + }; + } + + self.wal_seq = batch.apply()?; + self.tip_change.notify_waiters(); + + Ok(()) + } + + pub fn find_tip(&self) -> Result, Error> { + let iter = WalKV::iter_values(&self.db, IteratorMode::End); + + for value in iter { + let value = value?; + + if value.is_apply() || value.is_mark() { + let slot = value.slot().unwrap(); + let hash = *value.hash().unwrap(); + return Ok(Some((slot, hash))); + } + } + + Ok(None) + } + + pub fn intersect_options( + &self, + max_items: usize, + ) -> Result, Error> { + let mut iter = WalKV::iter_values(&self.db, rocksdb::IteratorMode::End) + .filter_map(|res| res.ok()) + .filter(|v| !v.is_undo()); + + let mut out = Vec::with_capacity(max_items); + + // crawl the wal exponentially + while let Some(val) = iter.next() { + if !val.is_apply() && !val.is_mark() { + continue; + } + + out.push((val.slot().unwrap(), *val.hash().unwrap())); + + if out.len() >= max_items { + break; + } + + // skip exponentially + let skip = 2usize.pow(out.len() as u32) - 1; + for _ in 0..skip { + iter.next(); + } + } + + Ok(out) + } + + pub fn crawl_after(&self, seq: Option) -> WalIterator { + if let Some(seq) = seq { + let seq = Box::<[u8]>::from(DBInt(seq)); + let from = rocksdb::IteratorMode::From(&seq, rocksdb::Direction::Forward); + let mut iter = WalKV::iter_entries(&self.db, from); + + // skip current + iter.next(); + + WalIterator(iter) + } else { + let from = rocksdb::IteratorMode::Start; + let iter = WalKV::iter_entries(&self.db, from); + WalIterator(iter) + } + } + + pub fn find_wal_seq(&self, block: Option<(BlockSlot, BlockHash)>) -> Result { + if block.is_none() { + return Ok(0); + } + + let (slot, hash) = block.unwrap(); + + // TODO: Not sure this is 100% accurate: + // i.e Apply(X), Apply(cursor), Undo(cursor), Mark(x) + // We want to start at Apply(cursor) or Mark(cursor), but even then, + // what if we have more than one Apply(cursor), how do we know + // which is correct? + let found = WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| { + (v.is_mark() || v.is_apply()) + && v.slot().is_some_and(|s| s == slot) + && v.hash().is_some_and(|h| h.eq(&hash)) + })?; + + match found { + Some(DBInt(seq)) => Ok(seq), + None => Err(Error::NotFound), + } + } + + /// Prune the WAL of entries with slot values over `k_param` from the tip + pub fn prune_wal(&self) -> Result<(), Error> { + let tip = self.find_tip()?.map(|(slot, _)| slot).unwrap_or_default(); + + // iterate through all values in Wal from start + let mut iter = WalKV::iter_entries(&self.db, rocksdb::IteratorMode::Start); + + let mut batch = WriteBatch::default(); + + while let Some(Ok((wal_key, value))) = iter.next() { + // get the number of slots that have passed since the wal point + let slot_delta = tip - value.slot().unwrap_or(0); + + if slot_delta <= self.k_param { + break; + } else { + WalKV::stage_delete(&self.db, wal_key, &mut batch); + } + } + + self.db.write(batch).map_err(|_| Error::IO)?; + + Ok(()) + } + + pub fn destroy(path: impl AsRef) -> Result<(), Error> { + DB::destroy(&Options::default(), path).map_err(|_| Error::IO) + } +} diff --git a/pallas-rolldb/src/wal/stream.rs b/pallas-rolldb/src/wal/stream.rs new file mode 100644 index 0000000..23ef069 --- /dev/null +++ b/pallas-rolldb/src/wal/stream.rs @@ -0,0 +1,80 @@ +use futures_core::Stream; + +use super::{Log, Seq, Store}; + +pub struct RollStream; + +impl RollStream { + pub fn start_after(store: Store, seq: Option) -> impl Stream { + async_stream::stream! { + let mut last_seq = seq; + + let iter = store.crawl_after(last_seq); + + for (seq, val) in iter.flatten() { + yield val; + last_seq = Some(seq); + } + + loop { + store.tip_change.notified().await; + let iter = store.crawl_after(last_seq); + + for (seq, val) in iter.flatten() { + yield val; + last_seq = Some(seq); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use futures_util::{pin_mut, StreamExt}; + + use crate::wal::{BlockBody, BlockHash, BlockSlot, Store}; + + fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) { + let hash = pallas_crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice()); + (slot, hash, slot.to_be_bytes().to_vec()) + } + + #[tokio::test] + async fn test_stream_waiting() { + let path = tempfile::tempdir().unwrap().into_path(); + let mut db = Store::open(path.clone(), 30).unwrap(); + + for i in 0..=100 { + let (slot, hash, body) = dummy_block(i * 10); + db.roll_forward(slot, hash, body).unwrap(); + } + + let mut db2 = db.clone(); + let background = tokio::spawn(async move { + for i in 101..=200 { + let (slot, hash, body) = dummy_block(i * 10); + db2.roll_forward(slot, hash, body).unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } + }); + + let s = super::RollStream::start_after(db.clone(), None); + + pin_mut!(s); + + let evt = s.next().await; + let evt = evt.unwrap(); + assert!(evt.is_origin()); + + for i in 0..=200 { + let evt = s.next().await; + let evt = evt.unwrap(); + assert!(evt.is_apply()); + assert_eq!(evt.slot().unwrap(), i * 10); + } + + background.abort(); + let _ = Store::destroy(path); //.unwrap(); + } +} diff --git a/pallas-rolldb/src/wal/tests.rs b/pallas-rolldb/src/wal/tests.rs new file mode 100644 index 0000000..df64d8c --- /dev/null +++ b/pallas-rolldb/src/wal/tests.rs @@ -0,0 +1,179 @@ +use super::{BlockBody, BlockHash, BlockSlot, Store}; + +fn with_tmp_db(k_param: u64, op: fn(store: Store) -> T) { + let path = tempfile::tempdir().unwrap().into_path(); + let store = Store::open(path.clone(), k_param).unwrap(); + + op(store); + + Store::destroy(path).unwrap(); +} + +fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) { + let hash = pallas_crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice()); + (slot, hash, slot.to_be_bytes().to_vec()) +} + +#[test] +fn test_origin_event() { + with_tmp_db(30, |db| { + let mut iter = db.crawl_after(None); + + let origin = iter.next(); + assert!(origin.is_some()); + + let origin = origin.unwrap(); + assert!(origin.is_ok()); + + let (seq, value) = origin.unwrap(); + assert_eq!(seq, 0); + assert!(value.is_origin()); + }); +} + +#[test] +fn test_basic_append() { + with_tmp_db(30, |mut db| { + let (slot, hash, body) = dummy_block(11); + db.roll_forward(slot, hash, body.clone()).unwrap(); + + // ensure tip matches + let (tip_slot, tip_hash) = db.find_tip().unwrap().unwrap(); + assert_eq!(tip_slot, slot); + assert_eq!(tip_hash, hash); + + // ensure chain has item + let mut iter = db.crawl_after(None); + + // skip origin + iter.next(); + + let (seq, log) = iter.next().unwrap().unwrap(); + assert_eq!(seq, 1); + assert_eq!(log.slot().unwrap(), slot); + assert_eq!(log.hash().unwrap(), &hash); + assert_eq!(log.body().unwrap(), &body); + }); +} + +#[test] +fn test_rollback_undos() { + with_tmp_db(30, |mut db| { + for i in 0..=5 { + let (slot, hash, body) = dummy_block(i * 10); + db.roll_forward(slot, hash, body).unwrap(); + } + + db.roll_back(20).unwrap(); + + // ensure tip show rollback point + let (tip_slot, _) = db.find_tip().unwrap().unwrap(); + assert_eq!(tip_slot, 20); + + // ensure chain has items not rolled back + let mut wal = db.crawl_after(None); + + let (seq, log) = wal.next().unwrap().unwrap(); + assert_eq!(seq, 0); + assert!(log.is_origin()); + + for i in 0..=5 { + let (_, log) = wal.next().unwrap().unwrap(); + assert!(log.is_apply()); + assert_eq!(log.slot().unwrap(), i * 10); + } + + for i in (3..=5).rev() { + let (_, log) = wal.next().unwrap().unwrap(); + assert!(log.is_undo()); + assert_eq!(log.slot().unwrap(), i * 10); + } + + let (_, log) = wal.next().unwrap().unwrap(); + assert!(log.is_mark()); + assert_eq!(log.slot().unwrap(), 20); + + // ensure chain stops here + assert!(wal.next().is_none()); + }); +} + +//TODO: test rollback beyond K +//TODO: test rollback with unknown slot + +#[test] +fn test_prune_linear() { + with_tmp_db(30, |mut db| { + for i in 0..100 { + let (slot, hash, body) = dummy_block(i * 10); + db.roll_forward(slot, hash, body).unwrap(); + } + + db.prune_wal().unwrap(); + + let mut wal = db.crawl_after(None); + + for i in 96..100 { + let (_, val) = wal.next().unwrap().unwrap(); + assert_eq!(val.slot().unwrap(), i * 10); + } + + assert!(wal.next().is_none()); + }); +} + +#[test] +fn test_prune_with_rollback() { + with_tmp_db(30, |mut db| { + for i in 0..100 { + let (slot, hash, body) = dummy_block(i * 10); + db.roll_forward(slot, hash, body).unwrap(); + } + + db.roll_back(800).unwrap(); + + // tip is 800 (Mark) + + db.prune_wal().unwrap(); + + let mut wal = db.crawl_after(None); + + for i in 77..100 { + let (_, val) = wal.next().unwrap().unwrap(); + assert!(val.is_apply()); + assert_eq!(val.slot().unwrap(), i * 10); + } + + for i in (81..100).rev() { + let (_, val) = wal.next().unwrap().unwrap(); + assert!(val.is_undo()); + assert_eq!(val.slot().unwrap(), i * 10); + } + + let (_, val) = wal.next().unwrap().unwrap(); + assert!(val.is_mark()); + assert_eq!(val.slot().unwrap(), 800); + + assert!(wal.next().is_none()); + }); +} + +#[test] +fn test_intersect_options() { + with_tmp_db(1000, |mut db| { + for i in 0..200 { + let (slot, hash, body) = dummy_block(i * 10); + db.roll_forward(slot, hash, body).unwrap(); + } + + db.prune_wal().unwrap(); + + let intersect = db.intersect_options(10).unwrap(); + + let expected = vec![1990, 1970, 1930, 1850, 1690, 1370, 980]; + + for (out, exp) in intersect.iter().zip(expected) { + assert_eq!(out.0, exp); + } + }); +} diff --git a/pallas/Cargo.toml b/pallas/Cargo.toml index 30d4e3f..38fc2b2 100644 --- a/pallas/Cargo.toml +++ b/pallas/Cargo.toml @@ -20,3 +20,7 @@ pallas-crypto = { version = "=0.19.1", path = "../pallas-crypto/" } pallas-codec = { version = "=0.19.1", path = "../pallas-codec/" } pallas-utxorpc = { version = "=0.19.1", path = "../pallas-utxorpc/" } pallas-configs = { version = "=0.19.1", path = "../pallas-configs/" } +pallas-rolldb = { version = "=0.19.1", path = "../pallas-rolldb/" } + +[features] +unstable = [] diff --git a/pallas/src/lib.rs b/pallas/src/lib.rs index 878399f..29b6cd2 100644 --- a/pallas/src/lib.rs +++ b/pallas/src/lib.rs @@ -41,4 +41,12 @@ pub mod interop { pub use pallas_utxorpc as utxorpc; } +pub mod storage { + //! Storage engines for chain-related persistence + + #[cfg(feature = "unstable")] + #[doc(inline)] + pub use pallas_rolldb as rolldb; +} + pub use pallas_applying as applying;