Skip to content

Write performance for small arrow RecordBatch 20x slower than parquet #5861

@joseph-isaacs

Description

@joseph-isaacs

Discussed in #5812

Originally posted by haohuaijin December 23, 2025
i recently do some benchmark for vortex that compare to parquet, i find if i only write to less than 1000 rows to vortex file, the performance is 30x slow than parquet, maybe i do something wrong.
below is the bechmark and result

use arrow::array::{ArrayRef, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::sync::{Arc, LazyLock};
use tempfile::NamedTempFile;
use vortex::dtype::arrow::FromArrowType;
use vortex::dtype::DType;
use vortex::error::VortexError;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::runtime::current::CurrentThreadRuntime;
use vortex::io::runtime::BlockingRuntime;
use vortex::io::session::RuntimeSessionExt;
use vortex::session::VortexSession;
use vortex::{array::ArrayRef as VortexArrayRef, VortexSessionDefault};
use vortex_array::arrow::FromArrowArray;
use vortex_array::iter::{ArrayIteratorAdapter, ArrayIteratorExt};

static RUNTIME: LazyLock<CurrentThreadRuntime> = LazyLock::new(CurrentThreadRuntime::new);
static SESSION: LazyLock<VortexSession> =
    LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle()));
static TOKIO_RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
});
static TOKIO_SESSION: LazyLock<VortexSession> =
    LazyLock::new(|| VortexSession::default().with_tokio());

fn create_test_batch(num_rows: usize, batch_size: Option<usize>) -> Vec<RecordBatch> {
    let batch_size = batch_size.unwrap_or(8192);

    let schema = Arc::new(Schema::new(vec![
        Field::new("_timestamp", DataType::Int64, false),
        Field::new("log", DataType::Utf8, false),
        Field::new("kubernetes_namespace_name", DataType::Utf8, false),
        Field::new("kubernetes_container_name", DataType::Utf8, false),
        Field::new("url", DataType::Utf8, false),
        Field::new("host", DataType::Utf8, false),
        Field::new("pod_name", DataType::Utf8, false),
        Field::new("service_name", DataType::Utf8, false),
        Field::new("level", DataType::Utf8, false),
        Field::new("thread", DataType::Utf8, false),
        Field::new("request_id", DataType::Utf8, false),
        Field::new("user_id", DataType::Utf8, false),
        Field::new("session_id", DataType::Utf8, false),
        Field::new("method", DataType::Utf8, false),
        Field::new("path", DataType::Utf8, false),
        Field::new("status_code", DataType::Int64, false),
        Field::new("response_time_ms", DataType::Int64, false),
        Field::new("region", DataType::Utf8, false),
        Field::new("environment", DataType::Utf8, false),
        Field::new("version", DataType::Utf8, false),
    ]));

    let num_batches = (num_rows + batch_size - 1) / batch_size;
    let mut batches = Vec::with_capacity(num_batches);

    for batch_idx in 0..num_batches {
        let start_idx = batch_idx * batch_size;
        let end_idx = std::cmp::min(start_idx + batch_size, num_rows);

        let timestamp_data: Vec<i64> = (start_idx..end_idx).map(|i| i as i64).collect();
        let log_data: Vec<String> = (start_idx..end_idx)
            .map(|i| {
                format!(
                    "This is log message number {} with some additional content to simulate real logs",
                    i
                )
            })
            .collect();
        let namespace_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("namespace-{}", i % 10))
            .collect();
        let container_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("container-{}", i % 20))
            .collect();
        let url_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("https://example.com/path/{}/resource?query={}", i % 100, i))
            .collect();
        let host_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("host-{}.example.com", i % 50))
            .collect();
        let pod_name_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("pod-{}-{}", i % 30, (i / 30) % 5))
            .collect();
        let service_name_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("service-{}", i % 15))
            .collect();
        let level_data: Vec<String> = (start_idx..end_idx)
            .map(|i| {
                match i % 5 {
                    0 => "ERROR",
                    1 => "WARN",
                    2 => "INFO",
                    3 => "DEBUG",
                    _ => "TRACE",
                }
                .to_string()
            })
            .collect();
        let thread_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("thread-{}", i % 8))
            .collect();
        let request_id_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("req-{:016x}", i))
            .collect();
        let user_id_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("user-{}", i % 1000))
            .collect();
        let session_id_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("session-{:016x}", i % 500))
            .collect();
        let method_data: Vec<String> = (start_idx..end_idx)
            .map(|i| {
                match i % 4 {
                    0 => "GET",
                    1 => "POST",
                    2 => "PUT",
                    _ => "DELETE",
                }
                .to_string()
            })
            .collect();
        let path_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("/api/v1/resource/{}/action", i % 200))
            .collect();
        let status_code_data: Vec<i64> = (start_idx..end_idx)
            .map(|i| match i % 10 {
                0 | 1 | 2 | 3 | 4 => 200,
                5 | 6 => 201,
                7 => 400,
                8 => 404,
                _ => 500,
            })
            .collect();
        let response_time_data: Vec<i64> = (start_idx..end_idx)
            .map(|i| 10 + (i as i64 % 990))
            .collect();
        let region_data: Vec<String> = (start_idx..end_idx)
            .map(|i| {
                match i % 4 {
                    0 => "us-east-1",
                    1 => "us-west-2",
                    2 => "eu-west-1",
                    _ => "ap-southeast-1",
                }
                .to_string()
            })
            .collect();
        let environment_data: Vec<String> = (start_idx..end_idx)
            .map(|i| {
                match i % 3 {
                    0 => "production",
                    1 => "staging",
                    _ => "development",
                }
                .to_string()
            })
            .collect();
        let version_data: Vec<String> = (start_idx..end_idx)
            .map(|i| format!("v{}.{}.{}", (i % 3) + 1, (i % 10), (i % 5)))
            .collect();

        let timestamp_array: ArrayRef = Arc::new(Int64Array::from(timestamp_data));
        let log_array: ArrayRef = Arc::new(StringArray::from(log_data));
        let namespace_array: ArrayRef = Arc::new(StringArray::from(namespace_data));
        let container_array: ArrayRef = Arc::new(StringArray::from(container_data));
        let url_array: ArrayRef = Arc::new(StringArray::from(url_data));
        let host_array: ArrayRef = Arc::new(StringArray::from(host_data));
        let pod_name_array: ArrayRef = Arc::new(StringArray::from(pod_name_data));
        let service_name_array: ArrayRef = Arc::new(StringArray::from(service_name_data));
        let level_array: ArrayRef = Arc::new(StringArray::from(level_data));
        let thread_array: ArrayRef = Arc::new(StringArray::from(thread_data));
        let request_id_array: ArrayRef = Arc::new(StringArray::from(request_id_data));
        let user_id_array: ArrayRef = Arc::new(StringArray::from(user_id_data));
        let session_id_array: ArrayRef = Arc::new(StringArray::from(session_id_data));
        let method_array: ArrayRef = Arc::new(StringArray::from(method_data));
        let path_array: ArrayRef = Arc::new(StringArray::from(path_data));
        let status_code_array: ArrayRef = Arc::new(Int64Array::from(status_code_data));
        let response_time_array: ArrayRef = Arc::new(Int64Array::from(response_time_data));
        let region_array: ArrayRef = Arc::new(StringArray::from(region_data));
        let environment_array: ArrayRef = Arc::new(StringArray::from(environment_data));
        let version_array: ArrayRef = Arc::new(StringArray::from(version_data));

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                timestamp_array,
                log_array,
                namespace_array,
                container_array,
                url_array,
                host_array,
                pod_name_array,
                service_name_array,
                level_array,
                thread_array,
                request_id_array,
                user_id_array,
                session_id_array,
                method_array,
                path_array,
                status_code_array,
                response_time_array,
                region_array,
                environment_array,
                version_array,
            ],
        )
        .unwrap();

        batches.push(batch);
    }

    batches
}

fn bench_parquet_write(batches: &[RecordBatch]) {
    let file = NamedTempFile::new().unwrap();
    let props = WriterProperties::builder().build();
    let mut writer =
        ArrowWriter::try_new(file.reopen().unwrap(), batches[0].schema(), Some(props)).unwrap();
    for batch in batches {
        writer.write(batch).unwrap();
    }
    writer.close().unwrap();
}

fn bench_vortex_write(batches: &[RecordBatch]) {
    let file = NamedTempFile::new().unwrap();

    TOKIO_RUNTIME.block_on(async {
        let schema = batches[0].schema();
        let batches_clone: Vec<RecordBatch> = batches.to_vec();

        let array_iter = ArrayIteratorAdapter::new(
            DType::from_arrow(schema),
            batches_clone
                .into_iter()
                .map(|batch| Ok::<RecordBatch, VortexError>(batch))
                .map(|batch_result| batch_result.map(|b| VortexArrayRef::from_arrow(b, false))),
        );

        let mut f = tokio::fs::File::create(file.path()).await.unwrap();

        TOKIO_SESSION
            .write_options()
            .write(&mut f, array_iter.into_array_stream())
            .await
            .unwrap();
    });
}

fn bench_vortex_blocking_write(batches: &[RecordBatch]) {
    let file = NamedTempFile::new().unwrap();

    let dtype = DType::from_arrow(batches[0].schema());

    let mut writer = SESSION
        .write_options()
        .blocking(&*RUNTIME)
        .writer(file.reopen().unwrap(), dtype);

    for batch in batches {
        let vortex_array = VortexArrayRef::from_arrow(batch, false);
        writer.push(vortex_array).unwrap();
    }

    writer.finish().unwrap();
}

fn write_benchmark(c: &mut Criterion) {
    // let sizes = vec![10, 1000, 10_000, 100_000, 1_000_000];
    let sizes = vec![10, 1000];

    let mut group = c.benchmark_group("recordbatch_write");

    for size in sizes {
        let batches = create_test_batch(size, None);
        let total_bytes: usize = batches.iter().map(|b| b.get_array_memory_size()).sum();

        group.throughput(Throughput::Bytes(total_bytes as u64));

        group.bench_with_input(BenchmarkId::new("parquet", size), &batches, |b, batches| {
            b.iter(|| bench_parquet_write(batches));
        });

        group.bench_with_input(
            BenchmarkId::new("vortex_async", size),
            &batches,
            |b, batches| {
                b.iter(|| bench_vortex_write(batches));
            },
        );

        group.bench_with_input(
            BenchmarkId::new("vortex_blocking", size),
            &batches,
            |b, batches| {
                b.iter(|| bench_vortex_blocking_write(batches));
            },
        );
    }

    group.finish();
}

criterion_group!(benches, write_benchmark);
criterion_main!(benches);
recordbatch_write/parquet/10
                        time:   [285.16 µs 292.74 µs 301.14 µs]
                        thrpt:  [23.334 MiB/s 24.003 MiB/s 24.641 MiB/s]
                 change:
                        time:   [-1.5744% +2.7495% +7.8510%] (p = 0.27 > 0.05)
                        thrpt:  [-7.2795% -2.6759% +1.5995%]
                        No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) high mild
  3 (3.00%) high severe
recordbatch_write/vortex_async/10
                        time:   [32.742 ms 32.829 ms 32.939 ms]
                        thrpt:  [218.44 KiB/s 219.18 KiB/s 219.76 KiB/s]
                 change:
                        time:   [+0.2577% +0.7567% +1.2332%] (p = 0.00 < 0.05)
                        thrpt:  [-1.2182% -0.7510% -0.2570%]
                        Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe
recordbatch_write/vortex_blocking/10
                        time:   [30.260 ms 30.465 ms 30.735 ms]
                        thrpt:  [234.11 KiB/s 236.18 KiB/s 237.78 KiB/s]
                 change:
                        time:   [+1.4255% +2.2166% +3.0887%] (p = 0.00 < 0.05)
                        thrpt:  [-2.9962% -2.1685% -1.4054%]
                        Performance has regressed.
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) high mild
  4 (4.00%) high severe
recordbatch_write/parquet/1000
                        time:   [1.0045 ms 1.0218 ms 1.0419 ms]
                        thrpt:  [503.02 MiB/s 512.91 MiB/s 521.71 MiB/s]
                 change:
                        time:   [-4.5346% -0.3172% +3.7622%] (p = 0.89 > 0.05)
                        thrpt:  [-3.6258% +0.3182% +4.7500%]
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) high mild
  4 (4.00%) high severe
recordbatch_write/vortex_async/1000
                        time:   [48.021 ms 48.368 ms 48.816 ms]
                        thrpt:  [10.736 MiB/s 10.835 MiB/s 10.913 MiB/s]
                 change:
                        time:   [+0.8243% +1.6720% +2.6448%] (p = 0.00 < 0.05)
                        thrpt:  [-2.5767% -1.6445% -0.8176%]
                        Change within noise threshold.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) high mild
  3 (3.00%) high severe
recordbatch_write/vortex_blocking/1000
                        time:   [44.711 ms 44.876 ms 45.055 ms]
                        thrpt:  [11.632 MiB/s 11.678 MiB/s 11.721 MiB/s]
                 change:
                        time:   [-3.4914% -2.5618% -1.7624%] (p = 0.00 < 0.05)
                        thrpt:  [+1.7940% +2.6291% +3.6177%]
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe
```</div>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions