-
Notifications
You must be signed in to change notification settings - Fork 117
Open
Description
Discussed in #5812
Originally posted by haohuaijin December 23, 2025
i recently do some benchmark for vortex that compare to parquet, i find if i only write to less than 1000 rows to vortex file, the performance is 30x slow than parquet, maybe i do something wrong.
below is the bechmark and result
use arrow::array::{ArrayRef, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::sync::{Arc, LazyLock};
use tempfile::NamedTempFile;
use vortex::dtype::arrow::FromArrowType;
use vortex::dtype::DType;
use vortex::error::VortexError;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::runtime::current::CurrentThreadRuntime;
use vortex::io::runtime::BlockingRuntime;
use vortex::io::session::RuntimeSessionExt;
use vortex::session::VortexSession;
use vortex::{array::ArrayRef as VortexArrayRef, VortexSessionDefault};
use vortex_array::arrow::FromArrowArray;
use vortex_array::iter::{ArrayIteratorAdapter, ArrayIteratorExt};
static RUNTIME: LazyLock<CurrentThreadRuntime> = LazyLock::new(CurrentThreadRuntime::new);
static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle()));
static TOKIO_RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
});
static TOKIO_SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::default().with_tokio());
fn create_test_batch(num_rows: usize, batch_size: Option<usize>) -> Vec<RecordBatch> {
let batch_size = batch_size.unwrap_or(8192);
let schema = Arc::new(Schema::new(vec![
Field::new("_timestamp", DataType::Int64, false),
Field::new("log", DataType::Utf8, false),
Field::new("kubernetes_namespace_name", DataType::Utf8, false),
Field::new("kubernetes_container_name", DataType::Utf8, false),
Field::new("url", DataType::Utf8, false),
Field::new("host", DataType::Utf8, false),
Field::new("pod_name", DataType::Utf8, false),
Field::new("service_name", DataType::Utf8, false),
Field::new("level", DataType::Utf8, false),
Field::new("thread", DataType::Utf8, false),
Field::new("request_id", DataType::Utf8, false),
Field::new("user_id", DataType::Utf8, false),
Field::new("session_id", DataType::Utf8, false),
Field::new("method", DataType::Utf8, false),
Field::new("path", DataType::Utf8, false),
Field::new("status_code", DataType::Int64, false),
Field::new("response_time_ms", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
Field::new("environment", DataType::Utf8, false),
Field::new("version", DataType::Utf8, false),
]));
let num_batches = (num_rows + batch_size - 1) / batch_size;
let mut batches = Vec::with_capacity(num_batches);
for batch_idx in 0..num_batches {
let start_idx = batch_idx * batch_size;
let end_idx = std::cmp::min(start_idx + batch_size, num_rows);
let timestamp_data: Vec<i64> = (start_idx..end_idx).map(|i| i as i64).collect();
let log_data: Vec<String> = (start_idx..end_idx)
.map(|i| {
format!(
"This is log message number {} with some additional content to simulate real logs",
i
)
})
.collect();
let namespace_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("namespace-{}", i % 10))
.collect();
let container_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("container-{}", i % 20))
.collect();
let url_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("https://example.com/path/{}/resource?query={}", i % 100, i))
.collect();
let host_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("host-{}.example.com", i % 50))
.collect();
let pod_name_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("pod-{}-{}", i % 30, (i / 30) % 5))
.collect();
let service_name_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("service-{}", i % 15))
.collect();
let level_data: Vec<String> = (start_idx..end_idx)
.map(|i| {
match i % 5 {
0 => "ERROR",
1 => "WARN",
2 => "INFO",
3 => "DEBUG",
_ => "TRACE",
}
.to_string()
})
.collect();
let thread_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("thread-{}", i % 8))
.collect();
let request_id_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("req-{:016x}", i))
.collect();
let user_id_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("user-{}", i % 1000))
.collect();
let session_id_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("session-{:016x}", i % 500))
.collect();
let method_data: Vec<String> = (start_idx..end_idx)
.map(|i| {
match i % 4 {
0 => "GET",
1 => "POST",
2 => "PUT",
_ => "DELETE",
}
.to_string()
})
.collect();
let path_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("/api/v1/resource/{}/action", i % 200))
.collect();
let status_code_data: Vec<i64> = (start_idx..end_idx)
.map(|i| match i % 10 {
0 | 1 | 2 | 3 | 4 => 200,
5 | 6 => 201,
7 => 400,
8 => 404,
_ => 500,
})
.collect();
let response_time_data: Vec<i64> = (start_idx..end_idx)
.map(|i| 10 + (i as i64 % 990))
.collect();
let region_data: Vec<String> = (start_idx..end_idx)
.map(|i| {
match i % 4 {
0 => "us-east-1",
1 => "us-west-2",
2 => "eu-west-1",
_ => "ap-southeast-1",
}
.to_string()
})
.collect();
let environment_data: Vec<String> = (start_idx..end_idx)
.map(|i| {
match i % 3 {
0 => "production",
1 => "staging",
_ => "development",
}
.to_string()
})
.collect();
let version_data: Vec<String> = (start_idx..end_idx)
.map(|i| format!("v{}.{}.{}", (i % 3) + 1, (i % 10), (i % 5)))
.collect();
let timestamp_array: ArrayRef = Arc::new(Int64Array::from(timestamp_data));
let log_array: ArrayRef = Arc::new(StringArray::from(log_data));
let namespace_array: ArrayRef = Arc::new(StringArray::from(namespace_data));
let container_array: ArrayRef = Arc::new(StringArray::from(container_data));
let url_array: ArrayRef = Arc::new(StringArray::from(url_data));
let host_array: ArrayRef = Arc::new(StringArray::from(host_data));
let pod_name_array: ArrayRef = Arc::new(StringArray::from(pod_name_data));
let service_name_array: ArrayRef = Arc::new(StringArray::from(service_name_data));
let level_array: ArrayRef = Arc::new(StringArray::from(level_data));
let thread_array: ArrayRef = Arc::new(StringArray::from(thread_data));
let request_id_array: ArrayRef = Arc::new(StringArray::from(request_id_data));
let user_id_array: ArrayRef = Arc::new(StringArray::from(user_id_data));
let session_id_array: ArrayRef = Arc::new(StringArray::from(session_id_data));
let method_array: ArrayRef = Arc::new(StringArray::from(method_data));
let path_array: ArrayRef = Arc::new(StringArray::from(path_data));
let status_code_array: ArrayRef = Arc::new(Int64Array::from(status_code_data));
let response_time_array: ArrayRef = Arc::new(Int64Array::from(response_time_data));
let region_array: ArrayRef = Arc::new(StringArray::from(region_data));
let environment_array: ArrayRef = Arc::new(StringArray::from(environment_data));
let version_array: ArrayRef = Arc::new(StringArray::from(version_data));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
timestamp_array,
log_array,
namespace_array,
container_array,
url_array,
host_array,
pod_name_array,
service_name_array,
level_array,
thread_array,
request_id_array,
user_id_array,
session_id_array,
method_array,
path_array,
status_code_array,
response_time_array,
region_array,
environment_array,
version_array,
],
)
.unwrap();
batches.push(batch);
}
batches
}
fn bench_parquet_write(batches: &[RecordBatch]) {
let file = NamedTempFile::new().unwrap();
let props = WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(file.reopen().unwrap(), batches[0].schema(), Some(props)).unwrap();
for batch in batches {
writer.write(batch).unwrap();
}
writer.close().unwrap();
}
fn bench_vortex_write(batches: &[RecordBatch]) {
let file = NamedTempFile::new().unwrap();
TOKIO_RUNTIME.block_on(async {
let schema = batches[0].schema();
let batches_clone: Vec<RecordBatch> = batches.to_vec();
let array_iter = ArrayIteratorAdapter::new(
DType::from_arrow(schema),
batches_clone
.into_iter()
.map(|batch| Ok::<RecordBatch, VortexError>(batch))
.map(|batch_result| batch_result.map(|b| VortexArrayRef::from_arrow(b, false))),
);
let mut f = tokio::fs::File::create(file.path()).await.unwrap();
TOKIO_SESSION
.write_options()
.write(&mut f, array_iter.into_array_stream())
.await
.unwrap();
});
}
fn bench_vortex_blocking_write(batches: &[RecordBatch]) {
let file = NamedTempFile::new().unwrap();
let dtype = DType::from_arrow(batches[0].schema());
let mut writer = SESSION
.write_options()
.blocking(&*RUNTIME)
.writer(file.reopen().unwrap(), dtype);
for batch in batches {
let vortex_array = VortexArrayRef::from_arrow(batch, false);
writer.push(vortex_array).unwrap();
}
writer.finish().unwrap();
}
fn write_benchmark(c: &mut Criterion) {
// let sizes = vec![10, 1000, 10_000, 100_000, 1_000_000];
let sizes = vec![10, 1000];
let mut group = c.benchmark_group("recordbatch_write");
for size in sizes {
let batches = create_test_batch(size, None);
let total_bytes: usize = batches.iter().map(|b| b.get_array_memory_size()).sum();
group.throughput(Throughput::Bytes(total_bytes as u64));
group.bench_with_input(BenchmarkId::new("parquet", size), &batches, |b, batches| {
b.iter(|| bench_parquet_write(batches));
});
group.bench_with_input(
BenchmarkId::new("vortex_async", size),
&batches,
|b, batches| {
b.iter(|| bench_vortex_write(batches));
},
);
group.bench_with_input(
BenchmarkId::new("vortex_blocking", size),
&batches,
|b, batches| {
b.iter(|| bench_vortex_blocking_write(batches));
},
);
}
group.finish();
}
criterion_group!(benches, write_benchmark);
criterion_main!(benches);recordbatch_write/parquet/10
time: [285.16 µs 292.74 µs 301.14 µs]
thrpt: [23.334 MiB/s 24.003 MiB/s 24.641 MiB/s]
change:
time: [-1.5744% +2.7495% +7.8510%] (p = 0.27 > 0.05)
thrpt: [-7.2795% -2.6759% +1.5995%]
No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)
1 (1.00%) high mild
3 (3.00%) high severe
recordbatch_write/vortex_async/10
time: [32.742 ms 32.829 ms 32.939 ms]
thrpt: [218.44 KiB/s 219.18 KiB/s 219.76 KiB/s]
change:
time: [+0.2577% +0.7567% +1.2332%] (p = 0.00 < 0.05)
thrpt: [-1.2182% -0.7510% -0.2570%]
Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high severe
recordbatch_write/vortex_blocking/10
time: [30.260 ms 30.465 ms 30.735 ms]
thrpt: [234.11 KiB/s 236.18 KiB/s 237.78 KiB/s]
change:
time: [+1.4255% +2.2166% +3.0887%] (p = 0.00 < 0.05)
thrpt: [-2.9962% -2.1685% -1.4054%]
Performance has regressed.
Found 5 outliers among 100 measurements (5.00%)
1 (1.00%) high mild
4 (4.00%) high severe
recordbatch_write/parquet/1000
time: [1.0045 ms 1.0218 ms 1.0419 ms]
thrpt: [503.02 MiB/s 512.91 MiB/s 521.71 MiB/s]
change:
time: [-4.5346% -0.3172% +3.7622%] (p = 0.89 > 0.05)
thrpt: [-3.6258% +0.3182% +4.7500%]
No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
3 (3.00%) high mild
4 (4.00%) high severe
recordbatch_write/vortex_async/1000
time: [48.021 ms 48.368 ms 48.816 ms]
thrpt: [10.736 MiB/s 10.835 MiB/s 10.913 MiB/s]
change:
time: [+0.8243% +1.6720% +2.6448%] (p = 0.00 < 0.05)
thrpt: [-2.5767% -1.6445% -0.8176%]
Change within noise threshold.
Found 4 outliers among 100 measurements (4.00%)
1 (1.00%) high mild
3 (3.00%) high severe
recordbatch_write/vortex_blocking/1000
time: [44.711 ms 44.876 ms 45.055 ms]
thrpt: [11.632 MiB/s 11.678 MiB/s 11.721 MiB/s]
change:
time: [-3.4914% -2.5618% -1.7624%] (p = 0.00 < 0.05)
thrpt: [+1.7940% +2.6291% +3.6177%]
Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high severe
```</div>
Metadata
Metadata
Assignees
Labels
No labels