chore: remove rolldb from repo (#537)

This commit is contained in:
Santiago Carmuega 2024-11-03 11:14:39 -03:00 committed by GitHub
parent 00e1611592
commit 3bdae69b0f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1 additions and 1567 deletions

View file

@ -8,7 +8,6 @@ members = [
"pallas-crypto",
"pallas-configs",
"pallas-primitives",
"pallas-rolldb",
"pallas-traverse",
"pallas-txbuilder",
"pallas-utxorpc",

View file

@ -1,26 +0,0 @@
[package]
name = "pallas-rolldb"
description = "An opinionated Cardano storage engine built on top of RocksDB"
version = "0.30.2"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
homepage = "https://github.com/txpipe/pallas"
documentation = "https://docs.rs/pallas-rolldb"
license = "Apache-2.0"
readme = "README.md"
authors = ["Santiago Carmuega <santiago@carmuega.me>"]
[dependencies]
rocksdb = { version = "0.22.0", default-features = false, features = ["multi-threaded-cf"] }
bincode = "1.3.3"
serde = "1.0.188"
thiserror = "1.0.49"
pallas-crypto = { version = "=0.30.2", path = "../pallas-crypto" }
tracing = "0.1.37"
tokio = { version = "1.32.0", features = ["sync", "rt", "time", "macros"] }
async-stream = "0.3.5"
futures-core = "0.3.28"
futures-util = "0.3.28"
[dev-dependencies]
tempfile = "3.3.0"

View file

@ -1,4 +0,0 @@
# Pallas RollDB
An opinionated Cardano storage engine built on top of RocksDB.

View file

@ -1,12 +0,0 @@
use pallas_crypto::hash::Hash;
mod store;
#[cfg(test)]
mod tests;
pub type BlockSlot = u64;
pub type BlockHash = Hash<32>;
pub type BlockBody = Vec<u8>;
pub use store::*;

View file

@ -1,275 +0,0 @@
use pallas_crypto::hash::Hash;
use std::{path::Path, sync::Arc};
use tracing::warn;
use rocksdb::{Options, WriteBatch, DB};
use super::{BlockBody, BlockHash, BlockSlot};
use crate::kvtable::*;
#[derive(Clone)]
pub struct Store {
db: Arc<DB>,
pub tip_change: Arc<tokio::sync::Notify>,
}
pub struct BlockByHashKV;
// hash -> block cbor
impl KVTable<DBHash, DBBytes> for BlockByHashKV {
const CF_NAME: &'static str = "BlockByHashKV";
}
// slot => block hash
pub struct HashBySlotKV;
impl KVTable<DBInt, DBHash> for HashBySlotKV {
const CF_NAME: &'static str = "HashBySlotKV";
}
pub struct ChainIterator<'a>(pub EntryIterator<'a, DBInt, DBHash>);
impl Iterator for ChainIterator<'_> {
type Item = Result<(u64, Hash<32>), Error>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|v| v.map(|(seq, val)| (seq.0, val.0)))
}
}
impl Store {
pub fn open(path: impl AsRef<Path>) -> Result<Self, Error> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let db = DB::open_cf(&opts, path, [BlockByHashKV::CF_NAME, HashBySlotKV::CF_NAME])
.map_err(|_| Error::IO)?;
let out = Self {
db: Arc::new(db),
tip_change: Arc::new(tokio::sync::Notify::new()),
};
Ok(out)
}
pub fn get_block(&self, hash: Hash<32>) -> Result<Option<BlockBody>, Error> {
let dbval = BlockByHashKV::get_by_key(&self.db, DBHash(hash))?;
Ok(dbval.map(|x| x.0))
}
pub fn roll_forward(
&mut self,
slot: BlockSlot,
hash: BlockHash,
body: BlockBody,
) -> Result<(), Error> {
let mut batch = WriteBatch::default();
// keep track of the new block body
BlockByHashKV::stage_upsert(&self.db, DBHash(hash), DBBytes(body), &mut batch);
// add new block to HashBySlotKV
HashBySlotKV::stage_upsert(&self.db, DBInt(slot), DBHash(hash), &mut batch);
self.db.write(batch).map_err(|_| Error::IO)?;
self.tip_change.notify_waiters();
Ok(())
}
pub fn roll_back(&mut self, until: BlockSlot) -> Result<(), Error> {
let mut batch = WriteBatch::default();
// remove rollback-ed blocks from HashBySlotKV
let to_remove = HashBySlotKV::iter_keys_from(&self.db, DBInt(until)).skip(1);
for key in to_remove {
HashBySlotKV::stage_delete(&self.db, key?, &mut batch);
}
self.db.write(batch).map_err(|_| Error::IO)?;
self.tip_change.notify_waiters();
Ok(())
}
pub fn roll_back_origin(&mut self) -> Result<(), Error> {
HashBySlotKV::reset(&self.db)?;
BlockByHashKV::reset(&self.db)?;
self.tip_change.notify_waiters();
Ok(())
}
pub fn find_tip(&self) -> Result<Option<(BlockSlot, BlockHash)>, Error> {
let mut iter = HashBySlotKV::iter_entries(&self.db, rocksdb::IteratorMode::End);
if let Some(last) = iter.next() {
let (slot, hash) = last?;
Ok(Some((slot.0, hash.0)))
} else {
Ok(None)
}
}
pub fn intersect_options(
&self,
max_items: usize,
) -> Result<Vec<(BlockSlot, BlockHash)>, Error> {
let mut iter = HashBySlotKV::iter_entries(&self.db, rocksdb::IteratorMode::End)
.filter_map(|res| res.ok())
.map(|(k, v)| (k.0, v.0));
let mut out = Vec::with_capacity(max_items);
while let Some((slot, hash)) = iter.next() {
out.push((slot, hash));
if out.len() >= max_items {
break;
}
// skip exponentially
let skip = 2usize.pow(out.len() as u32) - 1;
for _ in 0..skip {
iter.next();
}
}
Ok(out)
}
pub fn crawl_after(&self, slot: Option<u64>) -> ChainIterator {
if let Some(slot) = slot {
let slot = Box::<[u8]>::from(DBInt(slot));
let from = rocksdb::IteratorMode::From(&slot, rocksdb::Direction::Forward);
let mut iter = HashBySlotKV::iter_entries(&self.db, from);
// skip current
iter.next();
ChainIterator(iter)
} else {
let from = rocksdb::IteratorMode::Start;
let iter = HashBySlotKV::iter_entries(&self.db, from);
ChainIterator(iter)
}
}
pub fn crawl(&self) -> ChainIterator {
self.crawl_after(None)
}
pub fn read_chain_page(
&self,
from: BlockSlot,
len: usize,
) -> impl Iterator<Item = Result<(BlockSlot, BlockHash), Error>> + '_ {
HashBySlotKV::iter_entries_from(&self.db, DBInt(from))
.map(|res| res.map(|(x, y)| (x.0, y.0)))
.take(len)
}
/// Iterator over chain between two points (inclusive)
///
/// To use Origin as start point set `from` to None.
///
/// Returns None if either point in range don't exist or `to` point is
/// earlier in chain than `from`.
pub fn read_chain_range(
&self,
from: Option<(BlockSlot, BlockHash)>,
to: (BlockSlot, BlockHash),
) -> Result<Option<impl Iterator<Item = Result<(BlockSlot, BlockHash), Error>> + '_>, Error>
{
// TODO: We want to use a snapshot here to avoid race condition where
// point is checked to be in the HashBySlotKV but it is rolled-back before we
// create the iterator. Problem is `HashBySlotKV` etc must take `DB`, not
// `Snapshot<DB>`, so maybe we need a new way of creating something like
// a "KVTableSnapshot" in addition to the current "KVTable" type, which
// has methods on snapshots, but here I was having issues as there is
// no `cf` method on Snapshot but it is used is KVTable.
// let snapshot = self.db.snapshot();
// check p2 not before p1
let p1_slot = if let Some((slot, _)) = from {
if to.0 < slot {
warn!("chain range end slot before start slot");
return Ok(None);
} else {
slot
}
} else {
0 // Use 0 as slot for Origin
};
// check p1 exists in HashBySlotKV if provided
if let Some((slot, hash)) = from {
match HashBySlotKV::get_by_key(&self.db, DBInt(slot))? {
Some(DBHash(found_hash)) => {
if hash != found_hash {
warn!("chain range start hash mismatch");
return Ok(None);
}
}
None => {
warn!("chain range start slot not found");
return Ok(None);
}
}
}
// check p2 exists in HashBySlotKV
match HashBySlotKV::get_by_key(&self.db, DBInt(to.0))? {
Some(DBHash(found_hash)) => {
if to.1 != found_hash {
warn!("chain range end hash mismatch");
return Ok(None);
}
}
None => {
warn!("chain range end slot not found");
return Ok(None);
}
};
// return iterator between p1 and p2 inclusive
Ok(Some(
HashBySlotKV::iter_entries_from(&self.db, DBInt(p1_slot))
.map(|res| res.map(|(x, y)| (x.0, y.0)))
.take_while(move |x| {
if let Ok((slot, _)) = x {
// iter returns None once point is after `to` slot
*slot <= to.0
} else {
false
}
}),
))
}
/// Check if a point (pair of slot and block hash) exists in the
/// HashBySlotKV
pub fn chain_contains(&self, slot: BlockSlot, hash: &BlockHash) -> Result<bool, Error> {
if let Some(DBHash(found)) = HashBySlotKV::get_by_key(&self.db, DBInt(slot))? {
if found == *hash {
return Ok(true);
}
}
Ok(false)
}
pub fn is_empty(&self) -> bool {
HashBySlotKV::is_empty(&self.db) && BlockByHashKV::is_empty(&self.db)
}
pub fn destroy(path: impl AsRef<Path>) -> Result<(), Error> {
DB::destroy(&Options::default(), path).map_err(|_| Error::IO)
}
}

View file

@ -1,104 +0,0 @@
use super::{BlockBody, BlockHash, BlockSlot, Store};
fn with_tmp_db<T>(op: fn(db: Store) -> T) {
let path = tempfile::tempdir().unwrap().into_path();
let db = Store::open(path.clone()).unwrap();
op(db);
Store::destroy(path).unwrap();
}
fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) {
let hash = pallas_crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice());
(slot, hash, slot.to_be_bytes().to_vec())
}
#[test]
fn test_roll_forward_blackbox() {
with_tmp_db(|mut db| {
let (slot, hash, body) = dummy_block(11);
db.roll_forward(slot, hash, body.clone()).unwrap();
// ensure block body is persisted
let persisted = db.get_block(hash).unwrap().unwrap();
assert_eq!(persisted, body);
// ensure tip matches
let (tip_slot, tip_hash) = db.find_tip().unwrap().unwrap();
assert_eq!(tip_slot, slot);
assert_eq!(tip_hash, hash);
// ensure chain has item
let (chain_slot, chain_hash) = db.crawl().next().unwrap().unwrap();
assert_eq!(chain_slot, slot);
assert_eq!(chain_hash, hash);
});
}
#[test]
fn test_roll_back_blackbox() {
with_tmp_db(|mut db| {
for i in 0..=5 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}
db.roll_back(20).unwrap();
// ensure tip show rollback point
let (tip_slot, _) = db.find_tip().unwrap().unwrap();
assert_eq!(tip_slot, 20);
// ensure chain has items not rolled back
let mut chain = db.crawl();
for i in 0..=2 {
let (slot, _) = chain.next().unwrap().unwrap();
assert_eq!(slot, i * 10);
}
// ensure chain stops here
assert!(chain.next().is_none());
});
}
//TODO: test rollback beyond K
//TODO: test rollback with unknown slot
#[test]
fn test_chain_page() {
with_tmp_db(|mut db| {
for i in 0..100 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}
let mut chain = db.read_chain_page(200, 15);
for i in 0..15 {
let (slot, _) = chain.next().unwrap().unwrap();
assert_eq!(200 + (i * 10), slot)
}
assert!(chain.next().is_none());
});
}
#[test]
fn test_intersect_options() {
with_tmp_db(|mut db| {
for i in 0..200 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}
let intersect = db.intersect_options(10).unwrap();
let expected = vec![1990, 1970, 1930, 1850, 1690, 1370, 730];
for (out, exp) in intersect.iter().zip(expected) {
assert_eq!(out.0, exp);
}
});
}

View file

@ -1,422 +0,0 @@
use pallas_crypto::hash::Hash;
use serde::{de::DeserializeOwned, Serialize};
use std::marker::PhantomData;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("IO error")]
IO,
#[error("serde error")]
Serde,
#[error("not found")]
NotFound,
}
pub struct DBHash(pub Hash<32>);
impl From<Box<[u8]>> for DBHash {
fn from(value: Box<[u8]>) -> Self {
let inner: [u8; 32] = value[0..32].try_into().unwrap();
let inner = Hash::<32>::from(inner);
Self(inner)
}
}
impl From<DBHash> for Box<[u8]> {
fn from(value: DBHash) -> Self {
let b = value.0.to_vec();
b.into()
}
}
impl From<Hash<32>> for DBHash {
fn from(value: Hash<32>) -> Self {
DBHash(value)
}
}
impl From<DBHash> for Hash<32> {
fn from(value: DBHash) -> Self {
value.0
}
}
pub struct DBInt(pub u64);
impl From<DBInt> for Box<[u8]> {
fn from(value: DBInt) -> Self {
let b = value.0.to_be_bytes();
Box::new(b)
}
}
impl From<Box<[u8]>> for DBInt {
fn from(value: Box<[u8]>) -> Self {
let inner: [u8; 8] = value[0..8].try_into().unwrap();
let inner = u64::from_be_bytes(inner);
Self(inner)
}
}
impl From<u64> for DBInt {
fn from(value: u64) -> Self {
DBInt(value)
}
}
impl From<DBInt> for u64 {
fn from(value: DBInt) -> Self {
value.0
}
}
pub struct DBBytes(pub Vec<u8>);
impl From<DBBytes> for Box<[u8]> {
fn from(value: DBBytes) -> Self {
value.0.into()
}
}
impl From<Box<[u8]>> for DBBytes {
fn from(value: Box<[u8]>) -> Self {
Self(value.into())
}
}
impl<V> From<DBSerde<V>> for DBBytes
where
V: Serialize,
{
fn from(value: DBSerde<V>) -> Self {
let inner = bincode::serialize(&value.0).unwrap();
DBBytes(inner)
}
}
#[derive(Debug)]
pub struct DBSerde<V>(pub V);
impl<V> std::ops::Deref for DBSerde<V> {
type Target = V;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<V> From<DBSerde<V>> for Box<[u8]>
where
V: Serialize,
{
fn from(v: DBSerde<V>) -> Self {
bincode::serialize(&v.0)
.map(|x| x.into_boxed_slice())
.unwrap()
}
}
impl<V> From<Box<[u8]>> for DBSerde<V>
where
V: DeserializeOwned,
{
fn from(value: Box<[u8]>) -> Self {
let inner = bincode::deserialize(&value).unwrap();
DBSerde(inner)
}
}
impl<V> From<DBBytes> for DBSerde<V>
where
V: DeserializeOwned,
{
fn from(value: DBBytes) -> Self {
let inner = bincode::deserialize(&value.0).unwrap();
DBSerde(inner)
}
}
impl<V> Clone for DBSerde<V>
where
V: Clone,
{
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
pub struct WithDBIntPrefix<T>(pub u64, pub T);
impl<T> From<WithDBIntPrefix<T>> for Box<[u8]>
where
Box<[u8]>: From<T>,
{
fn from(value: WithDBIntPrefix<T>) -> Self {
let prefix: Box<[u8]> = DBInt(value.0).into();
let after: Box<[u8]> = value.1.into();
[prefix, after].concat().into()
}
}
impl<T> From<Box<[u8]>> for WithDBIntPrefix<T> {
fn from(_value: Box<[u8]>) -> Self {
todo!()
}
}
type RocksIterator<'a> = rocksdb::DBIteratorWithThreadMode<'a, rocksdb::DB>;
pub struct ValueIterator<'a, V>(RocksIterator<'a>, PhantomData<V>);
impl<'a, V> ValueIterator<'a, V> {
pub fn new(inner: RocksIterator<'a>) -> Self {
Self(inner, Default::default())
}
}
impl<'a, V> Iterator for ValueIterator<'a, V>
where
V: From<Box<[u8]>>,
{
type Item = Result<V, Error>;
fn next(&mut self) -> Option<Result<V, Error>> {
match self.0.next() {
Some(Ok((_, value))) => Some(Ok(V::from(value))),
Some(Err(err)) => {
tracing::error!(?err);
Some(Err(Error::IO))
}
None => None,
}
}
}
pub struct KeyIterator<'a, K>(RocksIterator<'a>, PhantomData<K>);
impl<'a, K> KeyIterator<'a, K> {
pub fn new(inner: RocksIterator<'a>) -> Self {
Self(inner, Default::default())
}
}
impl<'a, K> Iterator for KeyIterator<'a, K>
where
K: From<Box<[u8]>>,
{
type Item = Result<K, Error>;
fn next(&mut self) -> Option<Result<K, Error>> {
match self.0.next() {
Some(Ok((key, _))) => Some(Ok(K::from(key))),
Some(Err(err)) => {
tracing::error!(?err);
Some(Err(Error::IO))
}
None => None,
}
}
}
pub struct EntryIterator<'a, K, V>(RocksIterator<'a>, PhantomData<(K, V)>);
impl<'a, K, V> EntryIterator<'a, K, V> {
pub fn new(inner: RocksIterator<'a>) -> Self {
Self(inner, Default::default())
}
}
impl<'a, K, V> Iterator for EntryIterator<'a, K, V>
where
K: From<Box<[u8]>>,
V: From<Box<[u8]>>,
{
type Item = Result<(K, V), Error>;
fn next(&mut self) -> Option<Result<(K, V), Error>> {
match self.0.next() {
Some(Ok((key, value))) => {
let key_out = K::from(key);
let value_out = V::from(value);
Some(Ok((key_out, value_out)))
}
Some(Err(err)) => {
tracing::error!(?err);
Some(Err(Error::IO))
}
None => None,
}
}
}
pub trait KVTable<K, V>
where
Box<[u8]>: From<K>,
Box<[u8]>: From<V>,
K: From<Box<[u8]>>,
V: From<Box<[u8]>>,
{
const CF_NAME: &'static str;
fn cf(db: &rocksdb::DB) -> rocksdb::ColumnFamilyRef {
db.cf_handle(Self::CF_NAME).unwrap()
}
fn reset(db: &rocksdb::DB) -> Result<(), Error> {
db.drop_cf(Self::CF_NAME).map_err(|_| Error::IO)?;
db.create_cf(Self::CF_NAME, &rocksdb::Options::default())
.map_err(|_| Error::IO)?;
Ok(())
}
fn get_by_key(db: &rocksdb::DB, k: K) -> Result<Option<V>, Error> {
let cf = Self::cf(db);
let raw_key = Box::<[u8]>::from(k);
let raw_value = db
.get_cf(&cf, raw_key)
.map_err(|_| Error::IO)?
.map(|x| Box::from(x.as_slice()));
match raw_value {
Some(x) => {
let out = <V>::from(x);
Ok(Some(out))
}
None => Ok(None),
}
}
fn stage_upsert(db: &rocksdb::DB, k: K, v: V, batch: &mut rocksdb::WriteBatch) {
let cf = Self::cf(db);
let k_raw = Box::<[u8]>::from(k);
let v_raw = Box::<[u8]>::from(v);
batch.put_cf(&cf, k_raw, v_raw);
}
fn is_empty(db: &rocksdb::DB) -> bool {
// HACK: can't find an easy way to size the num of keys, so we'll start an
// iterator and see if we have at least one value. If someone know a better way
// to accomplish this, please refactor.
let mut iter = Self::iter_keys(db, rocksdb::IteratorMode::Start);
iter.next().is_none()
}
fn iter_keys<'a>(db: &'a rocksdb::DB, mode: rocksdb::IteratorMode) -> KeyIterator<'a, K> {
let cf = Self::cf(db);
let inner = db.iterator_cf(&cf, mode);
KeyIterator::new(inner)
}
#[allow(dead_code)]
fn iter_keys_start(db: &rocksdb::DB) -> KeyIterator<'_, K> {
Self::iter_keys(db, rocksdb::IteratorMode::Start)
}
fn iter_keys_from(db: &rocksdb::DB, from: K) -> KeyIterator<'_, K> {
let from_raw = Box::<[u8]>::from(from);
let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward);
Self::iter_keys(db, mode)
}
fn iter_values<'a>(db: &'a rocksdb::DB, mode: rocksdb::IteratorMode) -> ValueIterator<'a, V> {
let cf = Self::cf(db);
let inner = db.iterator_cf(&cf, mode);
ValueIterator::new(inner)
}
#[allow(dead_code)]
fn iter_values_start(db: &rocksdb::DB) -> ValueIterator<'_, V> {
Self::iter_values(db, rocksdb::IteratorMode::Start)
}
#[allow(dead_code)]
fn iter_values_from(db: &rocksdb::DB, from: K) -> ValueIterator<'_, V> {
let from_raw = Box::<[u8]>::from(from);
let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward);
Self::iter_values(db, mode)
}
fn iter_entries<'a>(
db: &'a rocksdb::DB,
mode: rocksdb::IteratorMode,
) -> EntryIterator<'a, K, V> {
let cf = Self::cf(db);
let inner = db.iterator_cf(&cf, mode);
EntryIterator::new(inner)
}
#[allow(dead_code)]
fn iter_entries_start(db: &rocksdb::DB) -> EntryIterator<'_, K, V> {
Self::iter_entries(db, rocksdb::IteratorMode::Start)
}
fn iter_entries_from(db: &rocksdb::DB, from: K) -> EntryIterator<'_, K, V> {
let from_raw = Box::<[u8]>::from(from);
let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward);
Self::iter_entries(db, mode)
}
fn last_key(db: &rocksdb::DB) -> Result<Option<K>, Error> {
let mut iter = Self::iter_keys(db, rocksdb::IteratorMode::End);
match iter.next() {
None => Ok(None),
Some(x) => Ok(Some(x?)),
}
}
#[allow(dead_code)]
fn last_value(db: &rocksdb::DB) -> Result<Option<V>, Error> {
let mut iter = Self::iter_values(db, rocksdb::IteratorMode::End);
match iter.next() {
None => Ok(None),
Some(x) => Ok(Some(x?)),
}
}
#[allow(dead_code)]
fn last_entry(db: &rocksdb::DB) -> Result<Option<(K, V)>, Error> {
let mut iter = Self::iter_entries(db, rocksdb::IteratorMode::End);
match iter.next() {
None => Ok(None),
Some(x) => Ok(Some(x?)),
}
}
fn scan_until<F>(
db: &rocksdb::DB,
mode: rocksdb::IteratorMode,
predicate: F,
) -> Result<Option<K>, Error>
where
F: Fn(&V) -> bool,
{
for (k, v) in Self::iter_entries(db, mode).flatten() {
if predicate(&v) {
return Ok(Some(k));
}
}
Ok(None)
}
fn stage_delete(db: &rocksdb::DB, key: K, batch: &mut rocksdb::WriteBatch) {
let cf = Self::cf(db);
let k_raw = Box::<[u8]>::from(key);
batch.delete_cf(&cf, k_raw);
}
}

View file

@ -1,3 +0,0 @@
pub mod chain;
mod kvtable;
pub mod wal;

View file

@ -1,15 +0,0 @@
use pallas_crypto::hash::Hash;
mod store;
mod stream;
#[cfg(test)]
mod tests;
pub type Seq = u64;
pub type BlockSlot = u64;
pub type BlockHash = Hash<32>;
pub type BlockBody = Vec<u8>;
pub use store::*;
pub use stream::*;

View file

@ -1,406 +0,0 @@
use rocksdb::Options;
use rocksdb::{IteratorMode, WriteBatch, DB};
use serde::{Deserialize, Serialize};
use std::{path::Path, sync::Arc};
use crate::kvtable::*;
use super::{BlockBody, BlockHash, BlockSlot, Seq};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Log {
Apply(BlockSlot, BlockHash, BlockBody),
Undo(BlockSlot, BlockHash, BlockBody),
Mark(BlockSlot, BlockHash, BlockBody),
Origin,
}
impl Log {
pub fn into_apply(
slot: impl Into<BlockSlot>,
hash: impl Into<BlockHash>,
block: impl Into<BlockBody>,
) -> Self {
Self::Apply(slot.into(), hash.into(), block.into())
}
pub fn slot(&self) -> Option<BlockSlot> {
match self {
Log::Apply(s, _, _) => Some(*s),
Log::Undo(s, _, _) => Some(*s),
Log::Mark(s, _, _) => Some(*s),
Log::Origin => None,
}
}
pub fn hash(&self) -> Option<&BlockHash> {
match self {
Log::Apply(_, h, _) => Some(h),
Log::Undo(_, h, _) => Some(h),
Log::Mark(_, h, _) => Some(h),
Log::Origin => None,
}
}
pub fn body(&self) -> Option<&BlockBody> {
match self {
Log::Apply(_, _, b) => Some(b),
Log::Undo(_, _, b) => Some(b),
Log::Mark(_, _, b) => Some(b),
Log::Origin => None,
}
}
pub fn into_undo(self) -> Option<Self> {
match self {
Self::Apply(s, h, b) => Some(Self::Undo(s, h, b)),
_ => None,
}
}
pub fn into_mark(self) -> Option<Self> {
match self {
Log::Apply(s, h, b) => Some(Log::Mark(s, h, b)),
Log::Mark(s, h, b) => Some(Log::Mark(s, h, b)),
Log::Origin => Some(Log::Origin),
Log::Undo(..) => None,
}
}
pub fn is_apply(&self) -> bool {
matches!(self, Log::Apply(..))
}
pub fn is_mark(&self) -> bool {
matches!(self, Log::Mark(..))
}
pub fn is_undo(&self) -> bool {
matches!(self, Log::Undo(..))
}
pub fn is_origin(&self) -> bool {
matches!(self, Log::Origin)
}
/// Checks if entry is a forward event (apply or mark)
pub fn is_forward(&self) -> bool {
self.is_mark() || self.is_apply()
}
/// Checks if entry is a forward event that matches the specified point
pub fn equals_point(&self, point: &(BlockSlot, BlockHash)) -> bool {
if !self.is_forward() {
return false;
}
self.slot().is_some_and(|x| x == point.0) && self.hash().is_some_and(|x| x.eq(&point.1))
}
/// Checks if entry is a forward event that matches any of the specified
/// points
pub fn equals_any_point(&self, points: &[(BlockSlot, BlockHash)]) -> bool {
points.iter().any(|x| self.equals_point(x))
}
}
// slot => block hash
pub struct WalKV;
impl KVTable<DBInt, DBSerde<Log>> for WalKV {
const CF_NAME: &'static str = "WalKV";
}
pub struct WalIterator<'a>(pub EntryIterator<'a, DBInt, DBSerde<Log>>);
impl Iterator for WalIterator<'_> {
type Item = Result<(u64, Log), Error>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|v| v.map(|(seq, val)| (seq.0, val.0)))
}
}
impl WalKV {
pub fn initialize(db: &DB) -> Result<Seq, Error> {
if Self::is_empty(db) {
Self::write_seed(db)?;
Ok(0)
} else {
let last = Self::last_key(db)?.map(|x| x.0);
Ok(last.unwrap())
}
}
fn write_seed(db: &DB) -> Result<(), Error> {
let mut batch = WriteBatch::default();
let k = DBInt(0);
let v = DBSerde(Log::Origin);
Self::stage_upsert(db, k, v, &mut batch);
db.write(batch).map_err(|_| Error::IO)
}
}
pub struct RollBatch<'a>(&'a DB, WriteBatch, Seq);
impl<'a> RollBatch<'a> {
fn new(db: &'a DB, last_seq: Seq) -> Self {
Self(db, Default::default(), last_seq)
}
fn stage_append(&mut self, log: Log) {
let new_seq = self.2 + 1;
WalKV::stage_upsert(self.0, DBInt(new_seq), DBSerde(log), &mut self.1);
self.2 = new_seq;
}
fn apply(self) -> Result<Seq, Error> {
self.0.write(self.1).map_err(|_| Error::IO)?;
Ok(self.2)
}
}
#[derive(Clone)]
pub struct Store {
db: Arc<DB>,
pub tip_change: Arc<tokio::sync::Notify>,
wal_seq: u64,
k_param: u64,
immutable_overlap: u64,
}
impl Store {
pub fn open(
path: impl AsRef<Path>,
k_param: u64,
immutable_overlap: Option<u64>,
) -> Result<Self, Error> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let db = DB::open_cf(&opts, path, [WalKV::CF_NAME]).map_err(|_| Error::IO)?;
let wal_seq = WalKV::initialize(&db)?;
let out = Self {
db: Arc::new(db),
tip_change: Arc::new(tokio::sync::Notify::new()),
wal_seq,
k_param,
immutable_overlap: immutable_overlap.unwrap_or(0),
};
Ok(out)
}
pub fn roll_forward(
&mut self,
slot: BlockSlot,
hash: BlockHash,
body: BlockBody,
) -> Result<(), Error> {
let mut batch = RollBatch::new(&self.db, self.wal_seq);
batch.stage_append(Log::Apply(slot, hash, body));
self.wal_seq = batch.apply()?;
self.tip_change.notify_waiters();
Ok(())
}
pub fn roll_back(&mut self, until: BlockSlot) -> Result<(), Error> {
let mut batch = RollBatch::new(&self.db, self.wal_seq);
let iter = WalKV::iter_values(&self.db, IteratorMode::End);
for step in iter {
let value = step.map_err(|_| Error::IO)?.0;
if value.slot().unwrap_or(0) <= until {
batch.stage_append(value.into_mark().unwrap());
break;
}
match value.into_undo() {
Some(undo) => {
batch.stage_append(undo);
}
None => continue,
};
}
self.wal_seq = batch.apply()?;
self.tip_change.notify_waiters();
Ok(())
}
pub fn roll_back_origin(&mut self) -> Result<(), Error> {
let mut batch = RollBatch::new(&self.db, self.wal_seq);
let iter = WalKV::iter_values(&self.db, IteratorMode::End);
for step in iter {
let value = step.map_err(|_| Error::IO)?.0;
if value.is_origin() {
break;
}
match value.into_undo() {
Some(undo) => {
batch.stage_append(undo);
}
None => continue,
};
}
self.wal_seq = batch.apply()?;
self.tip_change.notify_waiters();
Ok(())
}
pub fn find_tip(&self) -> Result<Option<(BlockSlot, BlockHash)>, Error> {
let iter = WalKV::iter_values(&self.db, IteratorMode::End);
for value in iter {
let value = value?;
if value.is_apply() || value.is_mark() {
let slot = value.slot().unwrap();
let hash = *value.hash().unwrap();
return Ok(Some((slot, hash)));
}
}
Ok(None)
}
pub fn intersect_options(
&self,
max_items: usize,
) -> Result<Vec<(BlockSlot, BlockHash)>, Error> {
let mut iter = WalKV::iter_values(&self.db, rocksdb::IteratorMode::End)
.filter_map(|res| res.ok())
.filter(|v| !v.is_undo());
let mut out = Vec::with_capacity(max_items);
// crawl the wal exponentially
while let Some(val) = iter.next() {
if !val.is_apply() && !val.is_mark() {
continue;
}
out.push((val.slot().unwrap(), *val.hash().unwrap()));
if out.len() >= max_items {
break;
}
// skip exponentially
let skip = 2usize.pow(out.len() as u32) - 1;
for _ in 0..skip {
iter.next();
}
}
Ok(out)
}
pub fn crawl_after(&self, seq: Option<u64>) -> WalIterator {
if let Some(seq) = seq {
let seq = Box::<[u8]>::from(DBInt(seq));
let from = rocksdb::IteratorMode::From(&seq, rocksdb::Direction::Forward);
let mut iter = WalKV::iter_entries(&self.db, from);
// skip current
iter.next();
WalIterator(iter)
} else {
let from = rocksdb::IteratorMode::Start;
let iter = WalKV::iter_entries(&self.db, from);
WalIterator(iter)
}
}
pub fn find_wal_seq(&self, intersect: &[(BlockSlot, BlockHash)]) -> Result<Option<Seq>, Error> {
if intersect.is_empty() {
return Ok(None);
}
let found = WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| {
v.equals_any_point(intersect)
})?;
match found {
Some(DBInt(seq)) => Ok(Some(seq)),
None => Err(Error::NotFound),
}
}
pub fn crawl_from_intersect(
&self,
options: &[(BlockSlot, BlockHash)],
) -> Result<WalIterator, Error> {
let seq = self.find_wal_seq(options)?;
// TODO: we need to create a RocksDB snapshot (with `db.snapshot()`) to use as
// the source for sequence scan and the iterator to ensure that sequence
// hasn't been pruned between operations. For the time being we consider
// this is a very narrow edge-case.
if let Some(seq) = seq {
let seq = Box::<[u8]>::from(DBInt(seq));
let from = rocksdb::IteratorMode::From(&seq, rocksdb::Direction::Forward);
let mut iter = WalKV::iter_entries(&self.db, from);
// skip current
iter.next();
Ok(WalIterator(iter))
} else {
let from = rocksdb::IteratorMode::Start;
let iter = WalKV::iter_entries(&self.db, from);
Ok(WalIterator(iter))
}
}
/// Prune the WAL of entries with slot values over `k_param` from the tip
pub fn prune_wal(&self) -> Result<(), Error> {
let tip = self.find_tip()?.map(|(slot, _)| slot).unwrap_or_default();
// iterate through all values in Wal from start
let mut iter = WalKV::iter_entries(&self.db, rocksdb::IteratorMode::Start);
let mut batch = WriteBatch::default();
while let Some(Ok((wal_key, value))) = iter.next() {
// get the number of slots that have passed since the wal point
let slot_delta = tip - value.slot().unwrap_or(0);
if slot_delta <= self.k_param + self.immutable_overlap {
break;
} else {
WalKV::stage_delete(&self.db, wal_key, &mut batch);
}
}
self.db.write(batch).map_err(|_| Error::IO)?;
Ok(())
}
pub fn is_empty(&self) -> bool {
WalKV::is_empty(&self.db)
}
pub fn destroy(path: impl AsRef<Path>) -> Result<(), Error> {
DB::destroy(&Options::default(), path).map_err(|_| Error::IO)
}
}

View file

@ -1,83 +0,0 @@
use futures_core::Stream;
use super::{BlockHash, BlockSlot, Log, Store};
pub struct RollStream;
impl RollStream {
pub fn intersect(
store: Store,
intersect: Vec<(BlockSlot, BlockHash)>,
) -> impl Stream<Item = Log> {
async_stream::stream! {
let mut last_seq = None;
let iter = store.crawl_from_intersect(&intersect).unwrap();
for (seq, val) in iter.flatten() {
yield val;
last_seq = Some(seq);
}
loop {
store.tip_change.notified().await;
let iter = store.crawl_after(last_seq);
for (seq, val) in iter.flatten() {
yield val;
last_seq = Some(seq);
}
}
}
}
}
#[cfg(test)]
mod tests {
use futures_util::{pin_mut, StreamExt};
use crate::wal::{BlockBody, BlockHash, BlockSlot, Store};
fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) {
let hash = pallas_crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice());
(slot, hash, slot.to_be_bytes().to_vec())
}
#[tokio::test]
async fn test_stream_waiting() {
let path = tempfile::tempdir().unwrap().into_path();
let mut db = Store::open(path.clone(), 30, None).unwrap();
for i in 0..=100 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}
let mut db2 = db.clone();
let background = tokio::spawn(async move {
for i in 101..=200 {
let (slot, hash, body) = dummy_block(i * 10);
db2.roll_forward(slot, hash, body).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
});
let s = super::RollStream::intersect(db.clone(), vec![]);
pin_mut!(s);
let evt = s.next().await;
let evt = evt.unwrap();
assert!(evt.is_origin());
for i in 0..=200 {
let evt = s.next().await;
let evt = evt.unwrap();
assert!(evt.is_apply());
assert_eq!(evt.slot().unwrap(), i * 10);
}
background.abort();
let _ = Store::destroy(path); //.unwrap();
}
}

View file

@ -1,213 +0,0 @@
use super::{BlockBody, BlockHash, BlockSlot, Store};
fn with_tmp_db<T>(k_param: u64, op: fn(store: Store) -> T) {
let path = tempfile::tempdir().unwrap().into_path();
let store = Store::open(path.clone(), k_param, None).unwrap();
op(store);
Store::destroy(path).unwrap();
}
fn with_tmp_db_overlap<T>(k_param: u64, overlap: u64, op: fn(store: Store) -> T) {
let path = tempfile::tempdir().unwrap().into_path();
let store = Store::open(path.clone(), k_param, Some(overlap)).unwrap();
op(store);
Store::destroy(path).unwrap();
}
fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) {
let hash = pallas_crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice());
(slot, hash, slot.to_be_bytes().to_vec())
}
#[test]
fn test_origin_event() {
with_tmp_db(30, |db| {
let mut iter = db.crawl_after(None);
let origin = iter.next();
assert!(origin.is_some());
let origin = origin.unwrap();
assert!(origin.is_ok());
let (seq, value) = origin.unwrap();
assert_eq!(seq, 0);
assert!(value.is_origin());
});
}
#[test]
fn test_basic_append() {
with_tmp_db(30, |mut db| {
let (slot, hash, body) = dummy_block(11);
db.roll_forward(slot, hash, body.clone()).unwrap();
// ensure tip matches
let (tip_slot, tip_hash) = db.find_tip().unwrap().unwrap();
assert_eq!(tip_slot, slot);
assert_eq!(tip_hash, hash);
// ensure chain has item
let mut iter = db.crawl_after(None);
// skip origin
iter.next();
let (seq, log) = iter.next().unwrap().unwrap();
assert_eq!(seq, 1);
assert_eq!(log.slot().unwrap(), slot);
assert_eq!(log.hash().unwrap(), &hash);
assert_eq!(log.body().unwrap(), &body);
});
}
#[test]
fn test_rollback_undos() {
with_tmp_db(30, |mut db| {
for i in 0..=5 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}
db.roll_back(20).unwrap();
// ensure tip show rollback point
let (tip_slot, _) = db.find_tip().unwrap().unwrap();
assert_eq!(tip_slot, 20);
// ensure chain has items not rolled back
let mut wal = db.crawl_after(None);
let (seq, log) = wal.next().unwrap().unwrap();
assert_eq!(seq, 0);
assert!(log.is_origin());
for i in 0..=5 {
let (_, log) = wal.next().unwrap().unwrap();
assert!(log.is_apply());
assert_eq!(log.slot().unwrap(), i * 10);
}
for i in (3..=5).rev() {
let (_, log) = wal.next().unwrap().unwrap();
assert!(log.is_undo());
assert_eq!(log.slot().unwrap(), i * 10);
}
let (_, log) = wal.next().unwrap().unwrap();
assert!(log.is_mark());
assert_eq!(log.slot().unwrap(), 20);
// ensure chain stops here
assert!(wal.next().is_none());
});
}
//TODO: test rollback beyond K
//TODO: test rollback with unknown slot
#[test]
fn test_prune_linear() {
with_tmp_db(30, |mut db| {
for i in 0..100 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}
// db contains slots: 0, 10, ..., 980, 990
// this should prune slots less than (990 - 30) = 960
db.prune_wal().unwrap();
let mut wal = db.crawl_after(None);
for i in 96..100 {
let (_, val) = wal.next().unwrap().unwrap();
assert_eq!(val.slot().unwrap(), i * 10);
}
assert!(wal.next().is_none());
});
}
#[test]
fn test_prune_linear_with_overlap() {
with_tmp_db_overlap(30, 20, |mut db| {
for i in 0..100 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}
// db contains slots: 0, 10, ..., 980, 990
// this should prune slots less than (990 - 30 - 20) = 940
db.prune_wal().unwrap();
let mut wal = db.crawl_after(None);
for i in 94..100 {
let (_, val) = wal.next().unwrap().unwrap();
assert_eq!(val.slot().unwrap(), i * 10);
}
assert!(wal.next().is_none());
});
}
#[test]
fn test_prune_with_rollback() {
with_tmp_db(30, |mut db| {
for i in 0..100 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}
db.roll_back(800).unwrap();
// tip is 800 (Mark)
db.prune_wal().unwrap();
let mut wal = db.crawl_after(None);
for i in 77..100 {
let (_, val) = wal.next().unwrap().unwrap();
assert!(val.is_apply());
assert_eq!(val.slot().unwrap(), i * 10);
}
for i in (81..100).rev() {
let (_, val) = wal.next().unwrap().unwrap();
assert!(val.is_undo());
assert_eq!(val.slot().unwrap(), i * 10);
}
let (_, val) = wal.next().unwrap().unwrap();
assert!(val.is_mark());
assert_eq!(val.slot().unwrap(), 800);
assert!(wal.next().is_none());
});
}
#[test]
fn test_intersect_options() {
with_tmp_db(1000, |mut db| {
for i in 0..200 {
let (slot, hash, body) = dummy_block(i * 10);
db.roll_forward(slot, hash, body).unwrap();
}
db.prune_wal().unwrap();
let intersect = db.intersect_options(10).unwrap();
let expected = vec![1990, 1970, 1930, 1850, 1690, 1370, 980];
for (out, exp) in intersect.iter().zip(expected) {
assert_eq!(out.0, exp);
}
});
}

View file

@ -22,14 +22,12 @@ pallas-configs = { version = "=0.30.2", path = "../pallas-configs/" }
pallas-txbuilder = { version = "=0.30.2", path = "../pallas-txbuilder/" }
pallas-math = { version = "=0.30.2", path = "../pallas-math/", optional = true }
pallas-applying = { version = "=0.30.2", path = "../pallas-applying/", optional = true }
pallas-rolldb = { version = "=0.30.2", path = "../pallas-rolldb/", optional = true }
pallas-wallet = { version = "=0.30.2", path = "../pallas-wallet/", optional = true }
pallas-hardano = { version = "=0.30.2", path = "../pallas-hardano/", optional = true }
[features]
rolldb = ["pallas-rolldb"]
hardano = ["pallas-hardano"]
wallet = ["pallas-wallet"]
applying = ["pallas-applying"]
math = ["pallas-math"]
unstable = ["rolldb", "hardano", "wallet", "applying"]
unstable = ["hardano", "wallet", "applying"]