Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ jobs:
with:
cache-on-failure: true
- name: Run feature check
run: cargo hack check --feature-powerset
run: cargo hack check --feature-powerset --mutually-exclusive-features "zstd,ruzstd" --at-least-one-of "zstd,ruzstd"

no_std:
runs-on: ubuntu-latest
Expand Down
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion crates/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ bitvec.workspace = true
derive_more = { version = "2.0", default-features = false }
eyre = { workspace = true, optional = true }
thiserror = { version = "2.0", default-features = false }
zstd = "=0.13.3"
zstd = { version = "=0.13.3", optional = true }
ruzstd = { version = "0.8", optional = true }

[dev-dependencies]
eyre.workspace = true
serde_json = "1.0"

[features]
default = ["zstd"]
test-utils = ["dep:eyre", "scroll-l1/test-utils"]
zstd = ["dep:zstd"]
ruzstd = ["dep:ruzstd"]
2 changes: 1 addition & 1 deletion crates/codec/src/decoding/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn decode_v2(calldata: &[u8], blob: &[u8]) -> Result<Batch, DecodingError> {

// get blob iterator and collect, skipping unused bytes.
let compressed_heap_blob = BlobSliceIter::from_blob_slice(blob).copied().collect::<Vec<_>>();
let uncompressed_heap_blob = decompress_blob_data(&compressed_heap_blob);
let uncompressed_heap_blob = decompress_blob_data(&compressed_heap_blob)?;
let buf = &mut (uncompressed_heap_blob.as_slice());

// check buf len.
Expand Down
54 changes: 50 additions & 4 deletions crates/codec/src/decoding/v2/zstd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,46 @@

use std::io::Read;

use zstd::Decoder;
#[cfg(not(any(feature = "zstd", feature = "ruzstd")))]
compile_error!("You must enable exactly one of the `zstd` or `ruzstd` features");
#[cfg(all(feature = "zstd", feature = "ruzstd"))]
compile_error!("Features `zstd` and `ruzstd` are mutually exclusive");

/// The ZSTD magic number for zstd compressed data header.
const ZSTD_MAGIC_NUMBER: [u8; 4] = [0x28, 0xb5, 0x2f, 0xfd];

/// Result type for Zstd operations.
type Result<T> = std::result::Result<T, ZstdError>;

/// Zstd error type.
#[derive(Debug)]
pub struct ZstdError(Box<dyn std::error::Error + Send + Sync>);

impl std::fmt::Display for ZstdError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ZstdError: {}", self.0)
}
}

impl std::error::Error for ZstdError {}

impl ZstdError {
/// Consumes the error and returns the inner error.
pub fn into_inner(self) -> Box<dyn std::error::Error + Send + Sync> {
self.0
}
}

/// Uncompress the provided data.
pub fn decompress_blob_data(data: &[u8]) -> Vec<u8> {
#[cfg(feature = "zstd")]
pub fn decompress_blob_data(data: &[u8]) -> Result<Vec<u8>> {
use zstd::Decoder;
let mut header_data = ZSTD_MAGIC_NUMBER.to_vec();

header_data.extend_from_slice(data);

// init decoder and owned output data.
let mut decoder = Decoder::new(header_data.as_slice()).unwrap();
let mut decoder = Decoder::new(header_data.as_slice()).map_err(|e| ZstdError(Box::new(e)))?;
// heuristic: use data length as the allocated output capacity.
let mut output = Vec::with_capacity(header_data.len());

Expand All @@ -29,5 +57,23 @@ pub fn decompress_blob_data(data: &[u8]) -> Vec<u8> {
output.extend_from_slice(&dst[..size]);
}

output
Ok(output)
}

/// Uncompress the provided data.
#[cfg(feature = "ruzstd")]
pub fn decompress_blob_data(data: &[u8]) -> Result<Vec<u8>> {
use ruzstd::decoding::StreamingDecoder;

let mut header_data = ZSTD_MAGIC_NUMBER.to_vec();
header_data.extend_from_slice(data);

// init decoder and owned output data.
let mut decoder =
StreamingDecoder::new(header_data.as_slice()).map_err(|e| ZstdError(Box::new(e)))?;
// heuristic: use data length as the allocated output capacity.
let mut output = Vec::with_capacity(header_data.len());
decoder.read_to_end(&mut output).map_err(|e| ZstdError(Box::new(e)))?;

Ok(output)
}
2 changes: 1 addition & 1 deletion crates/codec/src/decoding/v4/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn decode_v4(calldata: &[u8], blob: &[u8]) -> Result<Batch, DecodingError> {
debug_assert!(is_compressed == 1 || is_compressed == 0, "incorrect compressed byte flag");

let buf = if is_compressed == 1 {
heap_blob = decompress_blob_data(&heap_blob[1..]);
heap_blob = decompress_blob_data(&heap_blob[1..])?;
&mut heap_blob.as_slice()
} else {
&mut (&heap_blob[1..])
Expand Down
2 changes: 1 addition & 1 deletion crates/codec/src/decoding/v7/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn decode_v7(blob: &[u8]) -> Result<Batch, DecodingError> {

// uncompress if necessary.
let buf = if is_compressed == 1 {
heap_blob = decompress_blob_data(buf);
heap_blob = decompress_blob_data(buf)?;
&mut heap_blob.as_slice()
} else {
buf
Expand Down
8 changes: 8 additions & 0 deletions crates/codec/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub enum DecodingError {
InvalidCommitBatchCall(#[from] InvalidCommitBatchCall),
#[error("end of file")]
Eof,
#[error("zstd decompression error occurred: {0}")]
ZstdDecompression(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("decoding error occurred: {0}")]
Other(Box<dyn std::error::Error + Send + Sync + 'static>),
}
Expand All @@ -46,3 +48,9 @@ impl From<String> for DecodingError {
DecodingError::Other(value.into())
}
}

impl From<crate::decoding::v2::zstd::ZstdError> for DecodingError {
fn from(e: crate::decoding::v2::zstd::ZstdError) -> Self {
DecodingError::ZstdDecompression(e.into_inner())
}
}
Loading