diff --git a/pallas-hardano/src/storage/immutable/chunk.rs b/pallas-hardano/src/storage/immutable/chunk.rs index 190752d..9b689a2 100644 --- a/pallas-hardano/src/storage/immutable/chunk.rs +++ b/pallas-hardano/src/storage/immutable/chunk.rs @@ -15,52 +15,59 @@ pub type SecondaryEntry = super::secondary::Entry; pub struct Reader { inner: BufReader, index: SecondaryIndex, - current: Option>, - next: Option>, + current: Option>, + next: Option>, +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Cannot open chunk file, error: {0}")] + CannotOpenChunkFile(std::io::Error), + #[error("Cannot read block, error: {0}")] + CannotReadBlock(std::io::Error), + #[error(transparent)] + SecondaryIndexError(secondary::Error), } impl Reader { - fn open(mut index: SecondaryIndex, chunks: File) -> Result { + fn open(mut index: SecondaryIndex, chunks: File) -> Self { let inner = BufReader::new(chunks); let current = index.next(); let next = index.next(); - Ok(Self { + Self { inner, index, current, next, - }) + } } - fn read_middle_block( - file: &mut BufReader, - next_offset: u64, - ) -> Result, std::io::Error> { - let start = file.stream_position()?; + fn read_middle_block(file: &mut BufReader, next_offset: u64) -> Result, Error> { + let start = file.stream_position().map_err(Error::CannotReadBlock)?; let delta = next_offset - start; trace!(start, delta, "reading chunk middle block"); let mut buf = vec![0u8; delta as usize]; - file.read_exact(&mut buf)?; + file.read_exact(&mut buf).map_err(Error::CannotReadBlock)?; Ok(buf) } - fn read_last_block(file: &mut BufReader) -> Result, std::io::Error> { - let start = file.stream_position()?; + fn read_last_block(file: &mut BufReader) -> Result, Error> { + let start = file.stream_position().map_err(Error::CannotReadBlock)?; trace!(start, "reading chunk last block"); let mut buf = vec![]; - file.read_to_end(&mut buf)?; + file.read_to_end(&mut buf).map_err(Error::CannotReadBlock)?; Ok(buf) } } impl Iterator for Reader { - type Item = Result, std::io::Error>; + type Item = Result, Error>; fn next(&mut self) -> Option { match (self.current.take(), self.next.take()) { @@ -69,7 +76,7 @@ impl Iterator for Reader { self.current = None; self.next = None; - Some(Err(next)) + Some(Err(Error::SecondaryIndexError(next))) } (Some(_), Some(Ok(next))) => { let block = Self::read_middle_block(&mut self.inner, next.block_offset); @@ -91,18 +98,12 @@ impl Iterator for Reader { } } -pub fn read_blocks(dir: &Path, name: &str) -> Result { - let primary = dir.join(name).with_extension("primary"); - let primary = std::fs::File::open(primary)?; - let primary = immutable::primary::Reader::open(primary)?; - - let secondary = dir.join(name).with_extension("secondary"); - let secondary = std::fs::File::open(secondary)?; - let secondary = secondary::Reader::open(primary, secondary)?; +pub fn read_blocks(dir: &Path, name: &str) -> Result { + let secondary = secondary::read_entries(dir, name).map_err(Error::SecondaryIndexError)?; let chunk = dir.join(name).with_extension("chunk"); - let chunk = std::fs::File::open(chunk)?; - Reader::open(secondary, chunk) + let chunk = std::fs::File::open(chunk).map_err(Error::CannotOpenChunkFile)?; + Ok(Reader::open(secondary, chunk)) } #[cfg(test)] diff --git a/pallas-hardano/src/storage/immutable/mod.rs b/pallas-hardano/src/storage/immutable/mod.rs index 58fe20e..d952355 100644 --- a/pallas-hardano/src/storage/immutable/mod.rs +++ b/pallas-hardano/src/storage/immutable/mod.rs @@ -6,7 +6,6 @@ use std::{ use pallas_traverse::MultiEraBlock; use tap::Tap; -use thiserror::Error; use tracing::debug; pub mod chunk; @@ -17,13 +16,18 @@ pub mod secondary; // `network`. pub type Point = pallas_network::miniprotocols::Point; -#[derive(Debug, Error, PartialEq, Eq)] +#[derive(thiserror::Error, Debug)] pub enum Error { #[error("Cannot find block by the provided point: {0:?}")] CannotFindBlock(Point), - #[error("Origin block is missing, provided truncated chain data")] OriginMissing, + #[error("Cannot read directory, error: {0}")] + CannotReadDir(std::io::Error), + #[error("Cannot decode block, error: {0}")] + CannotDecodeBlock(pallas_traverse::Error), + #[error(transparent)] + ChunkReadError(chunk::Error), } /// Performs a binary search of the given sorted chunks in descending order @@ -39,8 +43,8 @@ pub enum Error { fn chunk_binary_search( chunks: &[ChunkT], point: &PointT, - cmp: impl Fn(&ChunkT, &PointT) -> Result>, -) -> Result, Box> { + cmp: impl Fn(&ChunkT, &PointT) -> Result, +) -> Result, Error> { let mut size = chunks.len(); let mut left = 0; let mut right: usize = size; @@ -75,12 +79,12 @@ fn iterate_till_point( iter: impl Iterator, slot: u64, block_hash: &[u8], -) -> Result, Box> { +) -> Result, Error> { let mut iter = iter.peekable(); match iter.peek() { Some(Ok(block_data)) => { let mut block_data = block_data.clone(); - let mut block = MultiEraBlock::decode(&block_data)?; + let mut block = MultiEraBlock::decode(&block_data).map_err(Error::CannotDecodeBlock)?; while block.slot() < slot { iter.next(); @@ -88,7 +92,8 @@ fn iterate_till_point( match iter.peek() { Some(Ok(data)) => { block_data.clone_from(data); - block = MultiEraBlock::decode(&block_data)?; + block = + MultiEraBlock::decode(&block_data).map_err(Error::CannotDecodeBlock)?; } Some(Err(_)) | None => return Ok(iter), } @@ -104,8 +109,9 @@ fn iterate_till_point( } } -fn build_stack_of_chunk_names(dir: &Path) -> Result { - let mut chunks = std::fs::read_dir(dir)? +fn build_stack_of_chunk_names(dir: &Path) -> Result { + let mut chunks = std::fs::read_dir(dir) + .map_err(Error::CannotReadDir)? .map_while(|e| e.ok()) .filter(|e| { e.path() @@ -135,7 +141,7 @@ pub type ChunkNameSack = Vec; pub struct ChunkReaders(PathBuf, ChunkNameSack); impl Iterator for ChunkReaders { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { self.1 @@ -145,9 +151,9 @@ impl Iterator for ChunkReaders { } } -pub type FallibleBlock = Result; +pub type FallibleBlock = Result; -pub fn read_blocks(dir: &Path) -> Result, std::io::Error> { +pub fn read_blocks(dir: &Path) -> Result, Error> { let names = build_stack_of_chunk_names(dir)?; let iter = ChunkReaders(dir.to_owned(), names) @@ -166,8 +172,9 @@ pub fn read_blocks(dir: &Path) -> Result, st /// genesis block. /// * `Error::CannotFindBlock` - If the specific block indicated by the `Point` /// value is not found. -/// * `std::io::Error` - If an I/O error occurs. -/// * `pallas_traverse::Error` - If the block cannot be decoded. +/// * `Error::CannotReadDir` - If the directory cannot be read. +/// * `Error::ChunkReadError` - Chunk read error. +/// * `Error::CannotDecodeBlock` - If the block cannot be decoded. /// /// # Example /// @@ -204,7 +211,7 @@ pub fn read_blocks(dir: &Path) -> Result, st pub fn read_blocks_from_point( dir: &Path, point: Point, -) -> Result + Send + Sync>, Box> { +) -> Result + Send + Sync>, Error> { let names = build_stack_of_chunk_names(dir)?; match point { @@ -218,7 +225,8 @@ pub fn read_blocks_from_point( // check the first block match iter.peek() { Some(Ok(block_data)) => { - let block = MultiEraBlock::decode(block_data)?; + let block = + MultiEraBlock::decode(block_data).map_err(Error::CannotDecodeBlock)?; // check that the first block is genesis if block.slot() == 0 && block.number() == 0 { Ok(Box::new(iter)) @@ -236,13 +244,15 @@ pub fn read_blocks_from_point( // and compares block's slot with provided slot number let cmp = { |chunk_name: &String, point: &u64| { - let mut blocks = chunk::read_blocks(dir, chunk_name)?; + let mut blocks = + chunk::read_blocks(dir, chunk_name).map_err(Error::ChunkReadError)?; // Try to read the first block from the chunk if let Some(block_data) = blocks.next() { - let block_data = block_data?; + let block_data = block_data.map_err(Error::ChunkReadError)?; - let block = MultiEraBlock::decode(&block_data)?; + let block = + MultiEraBlock::decode(&block_data).map_err(Error::CannotDecodeBlock)?; Ok(block.slot().cmp(point)) } else { Ok(Ordering::Greater) @@ -254,9 +264,10 @@ pub fn read_blocks_from_point( // index. let names = chunk_binary_search(&names, &slot, cmp)? .map(|chunk_index| names[..chunk_index + 1].to_vec()) - .ok_or::>( - Error::CannotFindBlock(Point::Specific(slot, block_hash.clone())).into(), - )?; + .ok_or(Error::CannotFindBlock(Point::Specific( + slot, + block_hash.clone(), + )))?; let iter = ChunkReaders(dir.to_owned(), names.clone()) .map_while(Result::ok) @@ -277,8 +288,9 @@ pub fn read_blocks_from_point( /// /// # Errors /// -/// * `std::io::Error` - If an I/O error occurs. -/// * `pallas_traverse::Error` - If the block cannot be decoded. +/// * `Error::CannotReadDir` - If the directory cannot be read. +/// * `Error::ChunkReadError` - Chunk read error. +/// * `Error::CannotDecodeBlock` - If the block cannot be decoded. /// /// # Example /// @@ -303,19 +315,21 @@ pub fn read_blocks_from_point( /// Ok(()) /// } /// ``` -pub fn get_tip(dir: &Path) -> Result, Box> { +pub fn get_tip(dir: &Path) -> Result, Error> { match build_stack_of_chunk_names(dir)?.into_iter().next() { Some(name) => { let tip_point = ChunkReaders(dir.to_owned(), vec![name]) .map_while(Result::ok) .flatten() .last() - .transpose()? + .transpose() + .map_err(Error::ChunkReadError)? .map(|tip_data| { MultiEraBlock::decode(&tip_data) .map(|block| Point::Specific(block.slot(), block.hash().to_vec())) }) - .transpose()?; + .transpose() + .map_err(Error::CannotDecodeBlock)?; Ok(tip_point) } None => Ok(None), @@ -511,25 +525,21 @@ mod tests { assert_eq!(Point::Specific(block.slot(), block.hash().to_vec()), point); // Try to read an origin block - assert_eq!( + assert!(matches!( read_blocks_from_point(Path::new("../test_data"), Point::Origin) .err() - .unwrap() - .downcast::() .unwrap(), - Box::new(super::Error::OriginMissing) - ); + super::Error::OriginMissing + )); // Try to read from a non existing point let point = Point::Specific(0, vec![]); - assert_eq!( + assert!(matches!( read_blocks_from_point(Path::new("../test_data"), point.clone()) .err() - .unwrap() - .downcast::() .unwrap(), - Box::new(super::Error::CannotFindBlock(point)) - ); + super::Error::CannotFindBlock(_) + )); } fn read_full_snapshot(path: &Path) { diff --git a/pallas-hardano/src/storage/immutable/primary.rs b/pallas-hardano/src/storage/immutable/primary.rs index 88aa266..7471875 100644 --- a/pallas-hardano/src/storage/immutable/primary.rs +++ b/pallas-hardano/src/storage/immutable/primary.rs @@ -11,6 +11,14 @@ binary_layout::define_layout!(layout, BigEndian, { pub type RelativeSlot = u32; pub type SecondaryOffset = u32; +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Version missing, cannot read version from primary index file, error: {0}")] + VersionMissing(std::io::Error), + #[error("Cannot read offset from primary index file, error: {0}")] + CannotReadPrimaryIndex(std::io::Error), +} + #[derive(Debug)] pub enum Entry { Empty(RelativeSlot), @@ -30,20 +38,22 @@ pub struct Reader { inner: BufReader, version: u8, last_slot: Option, - last_offset: Option>, - next_offset: Option>, + last_offset: Option>, + next_offset: Option>, } impl Reader { - fn read_version(inner: &mut BufReader) -> Result { - let mut buf = vec![0u8; 1]; - inner.read_exact(&mut buf)?; - let version = buf.first().unwrap(); + fn read_version(inner: &mut BufReader) -> Result { + let mut buf = [0u8; 1]; + inner + .read_exact(&mut buf) + .map_err(|e| Error::VersionMissing(e))?; + let version = buf[0]; - Ok(*version) + Ok(version) } - pub fn open(file: File) -> Result { + pub fn open(file: File) -> Result { let mut inner = BufReader::new(file); let version = Reader::read_version(&mut inner)?; @@ -63,7 +73,7 @@ impl Reader { self.version } - pub fn next_occupied(&mut self) -> Option> { + pub fn next_occupied(&mut self) -> Option> { loop { let next = self.next(); @@ -78,12 +88,12 @@ impl Reader { } } - fn read_offset(file: &mut BufReader) -> Option> { + fn read_offset(file: &mut BufReader) -> Option> { let mut buf = vec![0u8; layout::SIZE.unwrap()]; match file.read_exact(&mut buf) { Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => None, - Err(err) => Some(Err(err)), + Err(err) => Some(Err(Error::CannotReadPrimaryIndex(err))), Ok(_) => { let view = layout::View::new(&buf); let offset = view.secondary_offset().read(); @@ -94,7 +104,7 @@ impl Reader { } impl Iterator for Reader { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { match (self.last_offset.take(), self.next_offset.take()) { diff --git a/pallas-hardano/src/storage/immutable/secondary.rs b/pallas-hardano/src/storage/immutable/secondary.rs index 50bb61e..b0e1f74 100644 --- a/pallas-hardano/src/storage/immutable/secondary.rs +++ b/pallas-hardano/src/storage/immutable/secondary.rs @@ -18,6 +18,18 @@ binary_layout::define_layout!(layout, BigEndian, { block_or_ebb: [u8; 8], }); +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Cannot open file, error: {0}")] + CannotOpenFile(std::io::Error), + #[error("Cannot read secondary index, error: {0}")] + CannotReadSecondaryIndex(std::io::Error), + #[error("Inconsistent state between primary and secondary index")] + InconsistentState, + #[error(transparent)] + PrimaryIndexError(primary::Error), +} + #[derive(Debug)] pub struct Entry { pub block_offset: u64, @@ -49,32 +61,32 @@ pub type SecondaryOffset = u32; pub struct Reader { inner: BufReader, index: PrimaryIndex, - current: Option>, + current: Option>, } impl Reader { - pub fn open(mut index: PrimaryIndex, file: File) -> Result { + pub fn open(mut index: PrimaryIndex, file: File) -> Self { let inner = BufReader::new(file); let current = index.next_occupied(); - Ok(Self { + Self { inner, index, current, - }) + } } } impl Iterator for Reader { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { let current = match self.current.take()? { Ok(x) => x.offset()?, Err(err) => { self.current = None; - return Some(Err(err)); + return Some(Err(Error::PrimaryIndexError(err))); } }; @@ -82,7 +94,7 @@ impl Iterator for Reader { Ok(x) => x, Err(err) => { self.current = None; - return Some(Err(err)); + return Some(Err(Error::CannotReadSecondaryIndex(err))); } }; @@ -92,7 +104,7 @@ impl Iterator for Reader { Ok(_) => (), Err(err) => { self.current = None; - return Some(Err(err)); + return Some(Err(Error::CannotReadSecondaryIndex(err))); } } @@ -107,23 +119,27 @@ impl Iterator for Reader { Some(Ok(entry)) } + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { + self.current = None; + Some(Err(Error::InconsistentState)) + } Err(err) => { self.current = None; - Some(Err(err)) + Some(Err(Error::CannotReadSecondaryIndex(err))) } } } } -pub fn read_entries(dir: &Path, name: &str) -> Result { +pub fn read_entries(dir: &Path, name: &str) -> Result { let primary = dir.join(name).with_extension("primary"); - let primary = std::fs::File::open(primary)?; - let primary = primary::Reader::open(primary)?; + let primary = std::fs::File::open(primary).map_err(Error::CannotOpenFile)?; + let primary = primary::Reader::open(primary).map_err(Error::PrimaryIndexError)?; let secondary = dir.join(name).with_extension("secondary"); - let secondary = std::fs::File::open(secondary)?; + let secondary = std::fs::File::open(secondary).map_err(Error::CannotOpenFile)?; - secondary::Reader::open(primary, secondary) + Ok(secondary::Reader::open(primary, secondary)) } #[cfg(test)] @@ -138,4 +154,14 @@ mod tests { entry.unwrap(); } } + + #[test] + fn can_parse_inconsistent_entries() { + let reader = + super::read_entries(Path::new("../test_data/inconsistent_indexes"), "10366").unwrap(); + + for entry in reader { + entry.unwrap(); + } + } } diff --git a/test_data/inconsistent_indexes/10366.chunk b/test_data/inconsistent_indexes/10366.chunk new file mode 100644 index 0000000..c598e9f Binary files /dev/null and b/test_data/inconsistent_indexes/10366.chunk differ diff --git a/test_data/inconsistent_indexes/10366.primary b/test_data/inconsistent_indexes/10366.primary new file mode 100644 index 0000000..2da7cda Binary files /dev/null and b/test_data/inconsistent_indexes/10366.primary differ diff --git a/test_data/inconsistent_indexes/10366.secondary b/test_data/inconsistent_indexes/10366.secondary new file mode 100644 index 0000000..f0e4f04 Binary files /dev/null and b/test_data/inconsistent_indexes/10366.secondary differ