Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9d490f5
Disabled some benchmarks and scaled
stanbrub Mar 13, 2026
47f066f
Scaled up basic math combo
stanbrub Mar 13, 2026
dea74d7
Merge branch 'deephaven:main' into gc-benchmarking
stanbrub Mar 19, 2026
15cf1f4
Added a Local Parquet Generator as opposed to going through Kafka
stanbrub Mar 20, 2026
8604111
Added local parquet generator and 1st training test
stanbrub Mar 24, 2026
83b1c11
Added more train benchmarks. Improved Local Parquet Generator
stanbrub Mar 25, 2026
c552c01
Revert BasicMathCombo
stanbrub Mar 26, 2026
62aa96a
Revert BasicMathCombo
stanbrub Mar 26, 2026
f78ca22
Reverted scale and disabled for pre-train standard tests used for pre…
stanbrub Mar 26, 2026
e5412e7
Parallelized local parquet. worked around directory link failures
stanbrub Mar 31, 2026
ff4d891
Added 1st pass at benchmark even retrieval with JFR
stanbrub Apr 1, 2026
f35ab4f
Merge branch 'deephaven:main' into gc-benchmarking
stanbrub Apr 7, 2026
25629cc
Added jfr events
stanbrub Apr 7, 2026
254cca0
Merge branch 'deephaven:main' into gc-benchmarking
stanbrub Apr 7, 2026
528c365
Added UGP events
stanbrub Apr 9, 2026
bd5ff02
Rescaled only static trained for 120 secs
stanbrub Apr 10, 2026
75449bb
Updated adhoc for local parquet env variables
stanbrub Apr 10, 2026
ec2d95e
Open up dh data dir so local parquet can work
stanbrub Apr 10, 2026
a402a54
More logging for benchmark runs
stanbrub Apr 10, 2026
4cf8357
Scaling back AggBy because of system lockup
stanbrub Apr 10, 2026
8507794
Restrict the number of parquet threads and memory for the runner
stanbrub Apr 10, 2026
c0b5e7a
Fixed NaturalJoin OOM
stanbrub Apr 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/resources/adhoc-benchmark-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
- ./minio:/minio
environment:
- "START_OPTS=-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler ${CONFIG_OPTS}"
- "DEEPHAVEN_HOST_OS_DIR=${ENV_DEEPHAVEN_HOST_OS_DIR}"

redpanda:
command:
Expand Down
2 changes: 1 addition & 1 deletion .github/resources/adhoc-scale-benchmark.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ schema.registry.addr=redpanda:8081
kafka.consumer.addr=redpanda:29092

# Default timeout to complete processes (Executing queries, generating records)
default.completion.timeout=10 minutes
default.completion.timeout=20 minutes

# Default data distribution for column data (random, ascending, descending, runlength)
default.data.distribution=${baseDistrib}
Expand Down
1 change: 1 addition & 0 deletions .github/scripts/manage-deephaven-remote.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ if [[ ${CONFIG_OPTS} == "<default>" ]]; then
CONFIG_OPTS="-Xmx24g"
fi
echo "CONFIG_OPTS=${CONFIG_OPTS}" > .env
echo "ENV_DEEPHAVEN_HOST_OS_DIR=${DEEPHAVEN_DIR}" >> .env

IS_BRANCH="false"
if [[ ${DOCKER_IMG} == *"@sha"*":"* ]]; then
Expand Down
2 changes: 1 addition & 1 deletion .github/scripts/run-benchmarks-remote.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ title "-- Running Benchmarks --"
set +f
cd ${RUN_DIR}
cat ${RUN_TYPE}-scale-benchmark.properties | sed 's|${baseRowCount}|'"${ROW_COUNT}|g" | sed 's|${baseDistrib}|'"${DISTRIB}|g" | sed 's|${userHome}|'"${HOME}|g" > scale-benchmark.properties
JAVA_OPTS=$(echo -Dbenchmark.profile=scale-benchmark.properties -jar deephaven-benchmark-*-standalone.jar -cp standard-tests.jar)
JAVA_OPTS=$(echo -Xmx4g -Dbenchmark.profile=scale-benchmark.properties -jar deephaven-benchmark-*-standalone.jar -cp standard-tests.jar)
set -f

if [ "${TAG_NAME}" = "Any" ]; then
Expand Down
3 changes: 2 additions & 1 deletion .github/scripts/setup-test-server-remote.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ sudo docker system prune --volumes --force
sudo rm -rf ${DEEPHAVEN_DIR}

title "-- Staging Docker Resources --"
mkdir -p ${DEEPHAVEN_DIR}
mkdir -p ${DEEPHAVEN_DIR}/data
chmod 777 ${DEEPHAVEN_DIR}/data
cd ${DEEPHAVEN_DIR}
cp ${GIT_DIR}/benchmark/.github/resources/${RUN_TYPE}-benchmark-docker-compose.yml docker-compose.yml

Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@
<file>${project.basedir}/eclipse-java-google-style.xml</file>
</eclipse>
<licenseHeader>
<content>/* Copyright (c) 2022-$YEAR Deephaven Data Labs and Patent Pending */</content>
<content>/* Copyright (c) $YEAR Deephaven Data Labs and Patent Pending */</content>
</licenseHeader>
</java>
</configuration>
Expand Down Expand Up @@ -271,6 +271,11 @@
<artifactId>kafka-protobuf-serializer</artifactId>
<version>8.1.1</version>
</dependency>
<dependency>
<groupId>blue.strategic.parquet</groupId>
<artifactId>parquet-floor</artifactId>
<version>1.64</version>
</dependency>
<dependency>
<groupId>io.deephaven</groupId>
<artifactId>deephaven-java-client-barrage-dagger</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
* practical purposes, though it is not ideal.
*/
public class CompareTestRunner {
static {
System.setProperty("root.test.package", "io.deephaven.benchmark.tests");
}
final Object testInst;
final Set<String> requiredPackages = new LinkedHashSet<>();
final Map<String, String> downloadFiles = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */
/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.tests.standard;

import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -10,6 +10,7 @@
import io.deephaven.benchmark.controller.Controller;
import io.deephaven.benchmark.controller.DeephavenDockerController;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.util.Log;
import io.deephaven.benchmark.util.Timer;

/**
Expand All @@ -21,17 +22,23 @@
* conventions are followed (ex. main file is "source")
*/
final public class StandardTestRunner {
static {
System.setProperty("root.test.package", "io.deephaven.benchmark.tests");
}
final Object testInst;
final List<String> supportTables = new ArrayList<>();
final List<String> setupQueries = new ArrayList<>();
final List<String> preOpQueries = new ArrayList<>();
final List<String> teardownQueries = new ArrayList<>();
final Set<String> requiredServices = new TreeSet<>(List.of("deephaven"));
private String mainTable = "source";
private Bench api;
private Controller controller;
private int staticFactor = 1;
private int incFactor = 1;
private int rowCountFactor = 1;
private boolean useCachedSource = true;
private boolean useLocalParquet = false;

public StandardTestRunner(Object testInst) {
this.testInst = testInst;
Expand Down Expand Up @@ -96,6 +103,25 @@ public void setServices(String... services) {
requiredServices.addAll(Arrays.asList(services));
}

/**
* Set if the generated tables are loaded into memory before running the test queries.
*
* @return true if in memory source, otherwise false
*/
public void useCachedSource(boolean useMemorySource) {
this.useCachedSource = useMemorySource;
}

/**
* Set if the generated tables are created through Deephaven (i.e. real client-server) or through the local file
* system (i.e. a local copy). The default of "false" is preferred.
*
* @param useLocalParquet false to generate tables through Deephaven, otherwise false
*/
public void useLocalParquet(boolean useLocalParquet) {
this.useLocalParquet = useLocalParquet;
}

/**
* Add a query to be run directly after the main table is loaded. It is not measured. This query can transform the
* main table or supporting table, set up aggregations or updateby operations, etc.
Expand All @@ -117,6 +143,16 @@ public void addPreOpQuery(String query) {
preOpQueries.add(query);
}

/**
* Add a query to be run after everything else is done. This is useful for teardown of any resources after the test
* is run like logging, temporary files, perf table retrieval, etc.
*
* @param query the query to run after the measured operation
*/
public void addTeardownQuery(String query) {
teardownQueries.add(query);
}

/**
* The {@code scale.row.count} property supplies a default for the number of rows generated for benchmark tests.
* Given that some operations use less memory than others, scaling up the generated rows per operation is more
Expand Down Expand Up @@ -193,40 +229,42 @@ public void test(String name, long maxExpectedRowCount, String operation, String
}
}

long getWarmupRowCount() {
return (long) (api.propertyAsIntegral("warmup.row.count", "0") * rowCountFactor);
public long getGeneratedRowCount() {
return (long) (api.propertyAsIntegral("scale.row.count", "100000") * rowCountFactor);
}

long getGeneratedRowCount() {
return (long) (api.propertyAsIntegral("scale.row.count", "100000") * rowCountFactor);
long getWarmupRowCount() {
return (long) (api.propertyAsIntegral("warmup.row.count", "0") * rowCountFactor);
}

long getMaxExpectedRowCount(long expectedRowCount, long scaleFactor) {
return (expectedRowCount < 1) ? Long.MAX_VALUE : expectedRowCount;
}

String getReadOperation(int scaleFactor, long rowCount, String... loadColumns) {
var headRows = (rowCount >= getGeneratedRowCount())?"":".head(${rows})";
var headRows = (rowCount >= getGeneratedRowCount()) ? "" : ".head(${rows})";
var selectStr = useCachedSource ? "select" : "view";
if (scaleFactor > 1 && mainTable.equals("timed") && Arrays.asList(loadColumns).contains("timestamp")) {
var read = """
merge([
read('/data/timed.parquet').view(formulas=[${loadColumns}])${headRows}
bench_api_read('/data/timed.parquet').view(formulas=[${loadColumns}])${headRows}
] * ${scaleFactor}).update_view([
'timestamp=timestamp.plusMillis((long)(ii / ${rows}) * ${rows})'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't use the timestamp from the file? I have a few worries about doing rowset calculation as part of the benchmark (to come up with ii).

For the actual test benchmarks, without a select we would also just prefer more/bigger parquet files to avoid the overhead of going through the merge data structures. We might even be able to get away with symlinks to have the data just repeate itself.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the "train" benchmarks, since we don't use Scale Factors, that section of code will not be hit. This is only used when we are doing merges to simulate larger data sets. So for the nightly runs, this will happen BEFORE the "select" into memory, which is not included in the measurement. But for the "train" benchmarks, we only read timestamps directly from the parquet file(s), and that only if they are used in the benchmark (like for rollingtime).

]).select()
]).${selectStr}()
""";
read = read.replace("${headRows}",headRows);
read = read.replace("${headRows}", headRows).replace("${selectStr}", selectStr);
return read.replace("${scaleFactor}", "" + scaleFactor).replace("${rows}", "" + rowCount);
}

var read = "read('/data/${mainTable}.parquet')${headRows}.select(formulas=[${loadColumns}])";
var read = "bench_api_read('/data/${mainTable}.parquet')${headRows}.${selectStr}(formulas=[${loadColumns}])";
read = (loadColumns.length == 0) ? ("empty_table(${rows})") : read;

if (scaleFactor > 1) {
read = "merge([${readTable}] * ${scaleFactor})".replace("${readTable}", read);
read = read.replace("${scaleFactor}", "" + scaleFactor);
}
return read.replace("${headRows}",headRows).replace("${rows}", "" + rowCount);
read = read.replace("${headRows}", headRows).replace("${rows}", "" + rowCount);
return read.replace("${selectStr}", selectStr);
}

String getStaticQuery(String name, String operation, long rowCount, String... loadColumns) {
Expand All @@ -241,9 +279,11 @@ String getStaticQuery(String name, String operation, long rowCount, String... lo
bench_api_metrics_start()
print('${logOperationBegin}')

begin_clock = time.time_ns()
begin_time = time.perf_counter_ns()
result = ${operation}
end_time = time.perf_counter_ns()
end_clock = time.time_ns()

print('${logOperationEnd}')
bench_api_metrics_end()
Expand All @@ -253,7 +293,10 @@ String getStaticQuery(String name, String operation, long rowCount, String... lo
double_col("elapsed_nanos", [end_time - begin_time]),
long_col("processed_row_count", [loaded_tbl_size]),
long_col("result_row_count", [result.size]),
long_col("begin_clock_nanos", [begin_clock]),
long_col("end_clock_nanos", [end_clock]),
])
${teardownQueries}
""";
var read = getReadOperation(staticFactor, rowCount, loadColumns);
return populateQuery(name, staticQuery, operation, read, loadColumns);
Expand All @@ -278,6 +321,7 @@ String getIncQuery(String name, String operation, long rowCount, String... loadC
${preOpQueries}
bench_api_metrics_start()
print('${logOperationBegin}')
begin_clock = time.time_ns()
begin_time = time.perf_counter_ns()
result = ${operation}

Expand All @@ -291,15 +335,19 @@ String getIncQuery(String name, String operation, long rowCount, String... loadC
source_filter.waitForCompletion()

end_time = time.perf_counter_ns()
end_clock = time.time_ns()
print('${logOperationEnd}')
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()

stats = new_table([
double_col("elapsed_nanos", [end_time - begin_time]),
long_col("processed_row_count", [loaded_tbl_size]),
long_col("result_row_count", [result.size])
long_col("result_row_count", [result.size]),
long_col("begin_clock_nanos", [begin_clock]),
long_col("end_clock_nanos", [end_clock]),
])
${teardownQueries}
""";
var read = getReadOperation(incFactor, rowCount, loadColumns);
return populateQuery(name, incQuery, operation, read, loadColumns);
Expand All @@ -313,6 +361,7 @@ String populateQuery(String name, String query, String operation, String read, S
query = query.replace("${setupQueries}", String.join("\n", setupQueries));
query = query.replace("${preOpQueries}", String.join("\n", preOpQueries));
query = query.replace("${operation}", operation);
query = query.replace("${teardownQueries}", String.join("\n", teardownQueries));
query = query.replace("${logOperationBegin}", getLogSnippet("Begin", name));
query = query.replace("${logOperationEnd}", getLogSnippet("End", name));
return query;
Expand All @@ -326,6 +375,7 @@ Result runTest(String name, String warmupQuery, String mainQuery) {
stopUnusedServices(requiredServices);

try {
Log.info("Running Test: %s", name);
if (getWarmupRowCount() > 0)
api.query(warmupQuery).execute();
var result = new AtomicReference<Result>();
Expand All @@ -342,6 +392,8 @@ Result runTest(String name, String warmupQuery, String mainQuery) {
metrics.set("inc.factor", incFactor);
metrics.set("row.factor", rowCountFactor);
api.metrics().add(metrics);
}).fetchAfter("standard_events", table -> {
api.events().add(table);
}).execute();
api.result().test("deephaven-engine", result.get().elapsedTime(), result.get().loadedRowCount());
return result.get();
Expand All @@ -356,7 +408,7 @@ String listStr(String... values) {
}

String loadSupportTables() {
return supportTables.stream().map(t -> t + " = read('/data/" + t + ".parquet').select()\n")
return supportTables.stream().map(t -> t + " = bench_api_read('/data/" + t + ".parquet').select()\n")
.collect(Collectors.joining(""));
}

Expand Down Expand Up @@ -435,7 +487,7 @@ boolean generateNamedTable(String name, String distribution, String[] groups) {
}

boolean generateSourceTable(String distribution, String[] groups) {
return api.table("source")
var t = api.table("source")
.add("num1", "double", "[0-4]", distribution)
.add("num2", "double", "[1-10]", distribution)
.add("key1", "string", "[1-100]", distribution)
Expand All @@ -444,8 +496,8 @@ boolean generateSourceTable(String distribution, String[] groups) {
.add("key4", "int", "[0-98]", distribution)
.add("key5", "string", "[1-1000000]", distribution)
.withRowCount(getGeneratedRowCount())
.withColumnGrouping(groups)
.generateParquet();
.withColumnGrouping(groups);
return useLocalParquet ? t.generateLocalParquet() : t.generateParquet();
}

boolean generateRightTable(String distribution, String[] groups) {
Expand All @@ -469,7 +521,7 @@ boolean generateRightTable(String distribution, String[] groups) {
boolean generateTimedTable(String distribution, String[] groups) {
long minTime = 1676557157537L;
long maxTime = minTime + getGeneratedRowCount() - 1;
return api.table("timed")
var t = api.table("timed")
.add("timestamp", "timestamp-millis", "[" + minTime + "-" + maxTime + "]", "ascending")
.add("num1", "double", "[0-4]", distribution)
.add("num2", "double", "[1-10]", distribution)
Expand All @@ -478,8 +530,8 @@ boolean generateTimedTable(String distribution, String[] groups) {
.add("key3", "int", "[0-8]", distribution)
.add("key4", "int", "[0-98]", distribution)
.withFixedRowCount(true)
.withColumnGrouping(groups)
.generateParquet();
.withColumnGrouping(groups);
return useLocalParquet ? t.generateLocalParquet() : t.generateParquet();
}

record Result(long loadedRowCount, Duration elapsedTime, long resultRowCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* Test reading and writing parquet files with various data types and compression codecs.
*/
class FileTestRunner {
static {
System.setProperty("root.test.package", "io.deephaven.benchmark.tests");
}
final String parquetCfg = "max_dictionary_keys=1048576, max_dictionary_size=1048576, target_page_size=65536";
final Object testInst;
final Set<String> requiredServices = new TreeSet<>(List.of("deephaven"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
* append/blink table types. Results are checked to ensure the correct number of rows has been processed.
*/
class KafkaTestRunner {
static {
System.setProperty("root.test.package", "io.deephaven.benchmark.tests");
}
final Object testInst;
final Bench api;
final Controller controller;
Expand Down
41 changes: 41 additions & 0 deletions src/it/java/io/deephaven/benchmark/tests/train/AggByTrainTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* Copyright (c) 2026-2026 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.tests.train;

import org.junit.jupiter.api.*;

/**
* Training tests for the aggBy table operations that do aggregations (e.g. sum, std, min/max. var, avg). See
* <code>TrainTestRunner</code> for more information.
*/
public class AggByTrainTest {
final TrainTestRunner runner = new TrainTestRunner(this);

void setup(double rowFactor) {
runner.tables(rowFactor, "timed");

var setupStr = """
from deephaven import agg

aggs = [
agg.sum_('Sum=num1'), agg.std('Std=num2'), agg.min_('Min=num1'), agg.max_('Max=num2'),
agg.avg('Avg=num1'), agg.var('Var=num2'), agg.count_('num1')
]
""";
runner.addSetupQuery(setupStr);
}

@Test
void aggBy0Groups() {
setup(572);
var q = "timed.agg_by(aggs)";
runner.test("AggBy- No Groups", 1, q, "num1", "num2");
}

@Test
void aggBy2Groups() {
setup(66);
var q = "timed.agg_by(aggs, by=['key1', 'key2'])";
runner.test("AggBy- 2 Groups 10K Unique Combos", 10100, q, "key1", "key2", "num1", "num2");
}

}
Loading
Loading