diff --git a/.github/workflows/beam_Playground_CI_Nightly.yml b/.github/workflows/beam_Playground_CI_Nightly.yml index fc79c6fda9bc..d698fae4b1db 100644 --- a/.github/workflows/beam_Playground_CI_Nightly.yml +++ b/.github/workflows/beam_Playground_CI_Nightly.yml @@ -73,7 +73,16 @@ jobs: pip install -r requirements.txt - name: Get Beam latest release run: | - BEAM_VERSION=$(curl -s https://api.github.com/repos/apache/beam/releases/latest | jq -r '.tag_name') + BEAM_VERSION=$(curl -s -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" https://api.github.com/repos/apache/beam/releases/latest | jq -r '.tag_name') + if [ "$BEAM_VERSION" = "null" ] || [ -z "$BEAM_VERSION" ]; then + echo "Failed to fetch latest release from GitHub API. Querying git tags as fallback." + git fetch --tags + BEAM_VERSION=$(git tag -l "v[0-9]*" | sort -V | tail -n 1) + if [ -z "$BEAM_VERSION" ]; then + echo "Fallback failed. Setting to a default stable version." + BEAM_VERSION="v2.54.0" + fi + fi echo "BEAM_VERSION=${BEAM_VERSION#v}" >> $GITHUB_ENV - name: Build PYTHON base if: ${{ matrix.sdk == 'python' }} @@ -92,6 +101,23 @@ jobs: run: | CONTAINER_IP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' ${{ env.container_id }}) echo "container_ip=$CONTAINER_IP" >> $GITHUB_ENV + - name: Wait for backend readiness + run: | + echo "Checking readiness of container ${{ env.container_id }} at ${{ env.container_ip }}:8080" + for i in {1..60}; do + if timeout 1 bash -c "cat < /dev/null > /dev/tcp/${{ env.container_ip }}/8080" 2>/dev/null; then + echo "Playground backend is ready after $i seconds!" + exit 0 + fi + echo "Waiting for playground backend to start (attempt $i/60)..." + sleep 1 + done + echo "Playground backend failed to start within 60 seconds." + echo "=== Container logs ===" + docker logs ${{ env.container_id }} + echo "=== Container inspect ===" + docker inspect ${{ env.container_id }} + exit 1 - name: Run CI env: SERVER_ADDRESS: ${{ env.container_ip }}:8080 diff --git a/.github/workflows/beam_PostCommit_Java_IO_Performance_Tests.yml b/.github/workflows/beam_PostCommit_Java_IO_Performance_Tests.yml index 8820e32812ea..018d07cd163c 100644 --- a/.github/workflows/beam_PostCommit_Java_IO_Performance_Tests.yml +++ b/.github/workflows/beam_PostCommit_Java_IO_Performance_Tests.yml @@ -76,7 +76,16 @@ jobs: - name: Get Beam latest release if: github.event_name == 'schedule' #This has scheduled runs run against the latest release run: | - BEAM_VERSION=$(curl -s https://api.github.com/repos/apache/beam/releases/latest | jq -r '.tag_name') + BEAM_VERSION=$(curl -s -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" https://api.github.com/repos/apache/beam/releases/latest | jq -r '.tag_name') + if [ "$BEAM_VERSION" = "null" ] || [ -z "$BEAM_VERSION" ]; then + echo "Failed to fetch latest release from GitHub API. Querying git tags as fallback." + git fetch --tags + BEAM_VERSION=$(git tag -l "v[0-9]*" | sort -V | tail -n 1) + if [ -z "$BEAM_VERSION" ]; then + echo "Fallback failed. Setting to a default stable version tag." + BEAM_VERSION="v2.54.0" + fi + fi echo "BEAM_VERSION=${BEAM_VERSION}" >> $GITHUB_ENV - name: Checkout release branch if: github.event_name == 'schedule' #This has scheduled runs run against the latest release diff --git a/sdks/java/io/delta/build.gradle b/sdks/java/io/delta/build.gradle index 617965b3bc4e..68281a1a7087 100644 --- a/sdks/java/io/delta/build.gradle +++ b/sdks/java/io/delta/build.gradle @@ -36,4 +36,13 @@ dependencies { permitUnusedDeclared library.java.delta_kernel_defaults testImplementation library.java.junit + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation project(path: ":sdks:java:io:common") + testImplementation project(path: ":sdks:java:testing:test-utils") + testImplementation project(path: ":sdks:java:io:parquet") + testImplementation project(path: ":sdks:java:extensions:avro") + testImplementation library.java.hadoop_client + testImplementation library.java.hadoop_common + testImplementation library.java.slf4j_api } + diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java index 6c5df4728b4e..80fa34efd1cb 100644 --- a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java @@ -17,10 +17,53 @@ */ package org.apache.beam.sdk.io.delta; +import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; + import com.google.auto.value.AutoValue; +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.data.ScanStateRow; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -83,9 +126,351 @@ public ReadRows withConfig(Map config) { @Override public PCollection expand(PBegin input) { - // TODO(https://github.com/apache/beam/issues/38551): Implement expansion for - // Delta Lake ReadRows - throw new UnsupportedOperationException("Not implemented yet."); + if (getTablePath() == null) { + throw new IllegalArgumentException("Table path must be set."); + } + + org.apache.hadoop.conf.Configuration hadoopConfig = getHadoopConfiguration(getHadoopConfig()); + Engine engine = DefaultEngine.create(hadoopConfig); + Table table = Table.forPath(engine, getTablePath()); + + try { + Snapshot snapshot = buildSnapshot(engine, table); + Scan scan = snapshot.getScanBuilder(engine).build(); + StructType readSchema = scan.getSchema(engine); + org.apache.beam.sdk.schemas.Schema beamSchema = inferBeamSchema(readSchema); + + List fileDescriptors = buildFileDescriptors(engine, scan); + + return input + .apply("CreateFileDescriptors", Create.of(fileDescriptors) + .withCoder(SerializableCoder.of(DeltaFileDescriptor.class))) + .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema))) + .setRowSchema(beamSchema); + + } catch (Exception e) { + throw new RuntimeException("Failed to read Delta table: " + getTablePath(), e); + } + } + + private Snapshot buildSnapshot(Engine engine, Table table) throws Exception { + if (getVersion() != null) { + return table.getSnapshotAsOfVersion(engine, getVersion()); + } else if (getTimestamp() != null) { + long epochMillis = org.joda.time.Instant.parse(getTimestamp()).getMillis(); + return table.getSnapshotAsOfTimestamp(engine, epochMillis); + } else { + return table.getLatestSnapshot(engine); + } + } + + private List buildFileDescriptors(Engine engine, Scan scan) throws Exception { + List descriptors = new ArrayList<>(); + try (CloseableIterator scanFileIter = scan.getScanFiles(engine)) { + while (scanFileIter.hasNext()) { + FilteredColumnarBatch scanFilesBatch = scanFileIter.next(); + try (CloseableIterator scanFileRows = scanFilesBatch.getRows()) { + while (scanFileRows.hasNext()) { + io.delta.kernel.data.Row scanFileRow = scanFileRows.next(); + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); + descriptors.add(new DeltaFileDescriptor( + getTablePath(), + fileStatus.getPath(), + fileStatus.getSize(), + fileStatus.getModificationTime(), + getHadoopConfig(), + getVersion(), + getTimestamp() + )); + } + } + } + } + return descriptors; + } + } + + public static class DeltaFileDescriptor implements Serializable { + private final String tablePath; + private final String filePath; + private final long fileSize; + private final long modificationTime; + private final @Nullable Map hadoopConfig; + private final @Nullable Long version; + private final @Nullable String timestamp; + + public DeltaFileDescriptor( + String tablePath, + String filePath, + long fileSize, + long modificationTime, + @Nullable Map hadoopConfig, + @Nullable Long version, + @Nullable String timestamp) { + this.tablePath = tablePath; + this.filePath = filePath; + this.fileSize = fileSize; + this.modificationTime = modificationTime; + this.hadoopConfig = hadoopConfig; + this.version = version; + this.timestamp = timestamp; + } + + public String getTablePath() { + return tablePath; + } + + public String getFilePath() { + return filePath; + } + + public long getFileSize() { + return fileSize; + } + + public long getModificationTime() { + return modificationTime; + } + + public @Nullable Map getHadoopConfig() { + return hadoopConfig; + } + + public @Nullable Long getVersion() { + return version; + } + + public @Nullable String getTimestamp() { + return timestamp; + } + } + + public static class ReadFileFn extends DoFn { + private final org.apache.beam.sdk.schemas.Schema beamSchema; + + public ReadFileFn(org.apache.beam.sdk.schemas.Schema beamSchema) { + this.beamSchema = beamSchema; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + DeltaFileDescriptor desc = c.element(); + org.apache.hadoop.conf.Configuration hadoopConfig = getHadoopConfiguration(desc.getHadoopConfig()); + Engine engine = DefaultEngine.create(hadoopConfig); + Table table = Table.forPath(engine, desc.getTablePath()); + + Snapshot snapshot; + if (desc.getVersion() != null) { + snapshot = table.getSnapshotAsOfVersion(engine, desc.getVersion()); + } else if (desc.getTimestamp() != null) { + long epochMillis = org.joda.time.Instant.parse(desc.getTimestamp()).getMillis(); + snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis); + } else { + snapshot = table.getLatestSnapshot(engine); + } + + Scan scan = snapshot.getScanBuilder(engine).build(); + io.delta.kernel.data.Row scanState = scan.getScanState(engine); + + try (CloseableIterator scanFileIter = scan.getScanFiles(engine)) { + while (scanFileIter.hasNext()) { + FilteredColumnarBatch scanFilesBatch = scanFileIter.next(); + try (CloseableIterator scanFileRows = scanFilesBatch.getRows()) { + while (scanFileRows.hasNext()) { + io.delta.kernel.data.Row scanFileRow = scanFileRows.next(); + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); + if (fileStatus.getPath().equals(desc.getFilePath())) { + StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(scanState); + CloseableIterator physicalDataIter = + engine.getParquetHandler().readParquetFiles( + singletonCloseableIterator(fileStatus), + physicalReadSchema, + Optional.empty()).map(FilteredColumnarBatch::getData); + try ( + CloseableIterator transformedData = + Scan.transformPhysicalData( + engine, + scanState, + scanFileRow, + physicalDataIter)) { + while (transformedData.hasNext()) { + FilteredColumnarBatch filteredData = transformedData.next(); + try (CloseableIterator rows = filteredData.getRows()) { + while (rows.hasNext()) { + io.delta.kernel.data.Row row = rows.next(); + c.output(convertKernelRowToBeamRow(row, beamSchema)); + } + } + } + } + } + } + } + } + } + } + } + + private static org.apache.hadoop.conf.Configuration getHadoopConfiguration( + @Nullable Map configMap) { + org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration(); + if (configMap != null) { + for (Map.Entry entry : configMap.entrySet()) { + config.set(entry.getKey(), entry.getValue()); + } + } + return config; + } + + private static org.apache.beam.sdk.schemas.Schema inferBeamSchema(StructType structType) { + org.apache.beam.sdk.schemas.Schema.Builder builder = org.apache.beam.sdk.schemas.Schema.builder(); + for (StructField field : structType.fields()) { + builder.addField(field.getName(), toBeamFieldType(field.getDataType())); + } + return builder.build(); + } + + private static org.apache.beam.sdk.schemas.Schema.FieldType toBeamFieldType(DataType dataType) { + if (dataType instanceof IntegerType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.INT32; + } else if (dataType instanceof LongType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.INT64; + } else if (dataType instanceof StringType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.STRING; + } else if (dataType instanceof DoubleType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; + } else if (dataType instanceof FloatType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; + } else if (dataType instanceof BooleanType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; + } else if (dataType instanceof ShortType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.INT16; + } else if (dataType instanceof ByteType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; + } else if (dataType instanceof BinaryType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.BYTES; + } else if (dataType instanceof DecimalType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.DECIMAL; + } else if (dataType instanceof TimestampType || dataType instanceof DateType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME; + } else if (dataType instanceof StructType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.row(inferBeamSchema((StructType) dataType)); + } else if (dataType instanceof ArrayType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.array(toBeamFieldType(((ArrayType) dataType).getElementType())); + } else if (dataType instanceof MapType) { + MapType mapType = (MapType) dataType; + return org.apache.beam.sdk.schemas.Schema.FieldType.map( + toBeamFieldType(mapType.getKeyType()), + toBeamFieldType(mapType.getValueType())); + } else { + throw new IllegalArgumentException("Unsupported Delta type: " + dataType); + } + } + + private static Row convertKernelRowToBeamRow( + io.delta.kernel.data.Row deltaRow, org.apache.beam.sdk.schemas.Schema beamSchema) { + List values = new ArrayList<>(); + StructType structType = deltaRow.getSchema(); + for (int i = 0; i < structType.length(); i++) { + if (deltaRow.isNullAt(i)) { + values.add(null); + } else { + DataType dataType = structType.at(i).getDataType(); + values.add(convertValue(deltaRow, i, dataType)); + } + } + return Row.withSchema(beamSchema).addValues(values).build(); + } + + private static Object convertValue(io.delta.kernel.data.Row deltaRow, int ordinal, DataType dataType) { + if (deltaRow.isNullAt(ordinal)) { + return null; + } + if (dataType instanceof IntegerType) { + return deltaRow.getInt(ordinal); + } else if (dataType instanceof LongType) { + return deltaRow.getLong(ordinal); + } else if (dataType instanceof StringType) { + return deltaRow.getString(ordinal); + } else if (dataType instanceof DoubleType) { + return deltaRow.getDouble(ordinal); + } else if (dataType instanceof FloatType) { + return deltaRow.getFloat(ordinal); + } else if (dataType instanceof BooleanType) { + return deltaRow.getBoolean(ordinal); + } else if (dataType instanceof ShortType) { + return deltaRow.getShort(ordinal); + } else if (dataType instanceof ByteType) { + return deltaRow.getByte(ordinal); + } else if (dataType instanceof BinaryType) { + return deltaRow.getBinary(ordinal); + } else if (dataType instanceof DecimalType) { + return deltaRow.getDecimal(ordinal); + } else if (dataType instanceof TimestampType) { + long micros = deltaRow.getLong(ordinal); + return org.joda.time.Instant.ofEpochMilli(micros / 1000); + } else if (dataType instanceof DateType) { + int days = deltaRow.getInt(ordinal); + return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000); + } else if (dataType instanceof StructType) { + io.delta.kernel.data.Row structRow = deltaRow.getStruct(ordinal); + return convertKernelRowToBeamRow(structRow, inferBeamSchema((StructType) dataType)); + } else if (dataType instanceof ArrayType) { + ArrayValue arrayVal = deltaRow.getArray(ordinal); + int size = arrayVal.getSize(); + List list = new ArrayList<>(size); + DataType elemType = ((ArrayType) dataType).getElementType(); + ColumnVector vec = arrayVal.getElements(); + for (int j = 0; j < size; j++) { + if (vec.isNullAt(j)) { + list.add(null); + } else { + list.add(convertVectorValue(vec, j, elemType)); + } + } + return list; + } else if (dataType instanceof MapType) { + MapValue mapVal = deltaRow.getMap(ordinal); + int size = mapVal.getSize(); + Map map = new HashMap<>(size); + DataType keyType = ((MapType) dataType).getKeyType(); + DataType valueType = ((MapType) dataType).getValueType(); + ColumnVector keysVec = mapVal.getKeys(); + ColumnVector valuesVec = mapVal.getValues(); + for (int j = 0; j < size; j++) { + Object key = convertVectorValue(keysVec, j, keyType); + Object val = valuesVec.isNullAt(j) ? null : convertVectorValue(valuesVec, j, valueType); + map.put(key, val); + } + return map; + } else { + return deltaRow.toString(); + } + } + + private static Object convertVectorValue(ColumnVector vec, int rowId, DataType dataType) { + if (dataType instanceof IntegerType) { + return vec.getInt(rowId); + } else if (dataType instanceof LongType) { + return vec.getLong(rowId); + } else if (dataType instanceof StringType) { + return vec.getString(rowId); + } else if (dataType instanceof DoubleType) { + return vec.getDouble(rowId); + } else if (dataType instanceof FloatType) { + return vec.getFloat(rowId); + } else if (dataType instanceof BooleanType) { + return vec.getBoolean(rowId); + } else if (dataType instanceof ShortType) { + return vec.getShort(rowId); + } else if (dataType instanceof ByteType) { + return vec.getByte(rowId); + } else if (dataType instanceof BinaryType) { + return vec.getBinary(rowId); + } else if (dataType instanceof DecimalType) { + return vec.getDecimal(rowId); + } else { + return vec.toString(); } } } diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOIT.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOIT.java new file mode 100644 index 000000000000..b2af9ef17152 --- /dev/null +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOIT.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import static org.junit.Assert.assertNotEquals; + +import java.io.PrintWriter; +import java.io.Serializable; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.io.DefaultFilenamePolicy; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.IOITHelper; +import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MimeTypes; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.metrics.MetricsReader; +import org.apache.beam.sdk.testutils.metrics.TimeMonitor; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Performance and Integration tests for {@link DeltaIO}. + * + *

Run this test using the command below: + * + *

+ *  ./gradlew :sdks:java:io:delta:test --tests org.apache.beam.sdk.io.delta.DeltaIOIT
+ * 
+ */ +@RunWith(JUnit4.class) +public class DeltaIOIT implements Serializable { + + private static final Schema AVRO_SCHEMA = + new Schema.Parser() + .parse( + "{\n" + + " \"namespace\": \"ioitdelta\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRowRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"}\n" + + " ]\n" + + "}"); + + private static final Schema PARTITIONED_AVRO_SCHEMA = + new Schema.Parser() + .parse( + "{\n" + + " \"namespace\": \"ioitdelta\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRowRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"part\", \"type\": \"string\"}\n" + + " ]\n" + + "}"); + + private static final String NAMESPACE = DeltaIOIT.class.getName(); + + private static String tablePathPrefix; + private static InfluxDBSettings settings; + + @Rule public transient TestPipeline pipelineWrite = TestPipeline.create(); + @Rule public transient TestPipeline pipelineRead = TestPipeline.create(); + + @BeforeClass + public static void setup() { + DeltaIOTestPipelineOptions options = null; + try { + options = IOITHelper.readIOTestPipelineOptions(DeltaIOTestPipelineOptions.class); + } catch (IllegalArgumentException e) { + // In local environments, fall back to target directory if not provided + } + if (options != null) { + tablePathPrefix = options.getTablePath(); + settings = + InfluxDBSettings.builder() + .withHost(options.getInfluxHost()) + .withDatabase(options.getInfluxDatabase()) + .withMeasurement(options.getInfluxMeasurement()) + .get(); + } else { + tablePathPrefix = "target/temp-delta-table"; + settings = null; + } + } + + @Test + public void testReadSmall() throws Exception { + runIntegrationTest(1000, false, "small"); + } + + @Test + public void testReadLarge() throws Exception { + runIntegrationTest(100000, false, "large"); + } + + @Test + public void testReadPartitioned() throws Exception { + runIntegrationTest(1000, true, "partitioned"); + } + + private void runIntegrationTest(int numRecords, boolean isPartitioned, String scenarioName) throws Exception { + String tablePath = appendTimestampSuffix(tablePathPrefix + "-" + scenarioName); + try { + // 1. Write Parquet files using Beam + writeParquetFiles(tablePath, numRecords, isPartitioned); + + // 2. Generate Delta Log + generateDeltaLog(tablePath, isPartitioned); + + // 3. Read Delta Table and assert + readAndVerify(tablePath, numRecords, isPartitioned, scenarioName); + + } finally { + cleanUp(tablePath); + } + } + + private static String appendTimestampSuffix(String text) { + return String.format("%s_%s", text, java.time.Instant.now().toEpochMilli()); + } + + private void writeParquetFiles(String tablePath, int numRecords, boolean isPartitioned) { + if (isPartitioned) { + pipelineWrite + .apply("Generate sequence", GenerateSequence.from(0).to(numRecords)) + .apply("Construct TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply("Convert to Partitioned Avro", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + TestRow row = c.element(); + String part = (row.id() % 2 == 0) ? "even" : "odd"; + c.output(new GenericRecordBuilder(PARTITIONED_AVRO_SCHEMA) + .set("id", row.id()) + .set("name", row.name()) + .set("part", part) + .build()); + } + })) + .setCoder(AvroCoder.of(PARTITIONED_AVRO_SCHEMA)) + .apply("Write Partitioned Parquet", FileIO.writeDynamic() + .by(record -> "part=" + record.get("part")) + .via(ParquetIO.sink(PARTITIONED_AVRO_SCHEMA)) + .to(tablePath) + .withNaming(key -> DefaultFilenamePolicy.fromStandardNaming( + key + "/part", null, ".parquet", false))); + } else { + pipelineWrite + .apply("Generate sequence", GenerateSequence.from(0).to(numRecords)) + .apply("Construct TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply("Convert to Avro", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + TestRow row = c.element(); + c.output(new GenericRecordBuilder(AVRO_SCHEMA) + .set("id", row.id()) + .set("name", row.name()) + .build()); + } + })) + .setCoder(AvroCoder.of(AVRO_SCHEMA)) + .apply("Write Parquet files", FileIO.write() + .via(ParquetIO.sink(AVRO_SCHEMA)) + .to(tablePath) + .withNumShards(2)); + } + pipelineWrite.run().waitUntilFinish(); + } + + private void generateDeltaLog(String tablePath, boolean isPartitioned) throws Exception { + List metadataList = FileSystems.match(tablePath + "/**/*.parquet").metadata(); + ResourceId logFileResourceId = FileSystems.matchNewResource(tablePath + "/_delta_log/00000000000000000000.json", false); + + try (PrintWriter writer = new PrintWriter( + Channels.newWriter(FileSystems.create(logFileResourceId, MimeTypes.TEXT), "UTF-8"))) { + writer.println("{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":1}}"); + if (isPartitioned) { + writer.println("{\"metaData\":{\"id\":\"test-uuid-partitioned\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"id\\\",\\\"type\\\":\\\"integer\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"part\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionBy\":[\"part\"],\"configuration\":{},\"createdTime\":1717081200000}}"); + } else { + writer.println("{\"metaData\":{\"id\":\"test-uuid-nonpartitioned\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"id\\\",\\\"type\\\":\\\"integer\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionBy\":[],\"configuration\":{},\"createdTime\":1717081200000}}"); + } + + for (MatchResult.Metadata metadata : metadataList) { + String fullPath = metadata.resourceId().toString(); + String tableRoot = FileSystems.matchNewResource(tablePath, true).toString(); + String relativePath = fullPath.substring(tableRoot.length()); + + if (relativePath.startsWith("/")) { + relativePath = relativePath.substring(1); + } + + String addAction; + if (isPartitioned) { + String partValue = relativePath.contains("part=even") ? "even" : "odd"; + addAction = String.format( + "{\"add\":{\"path\":\"%s\",\"partitionValues\":{\"part\":\"%s\"},\"size\":%d,\"modificationTime\":%d,\"dataChange\":true}}", + relativePath, partValue, metadata.sizeBytes(), metadata.lastModifiedMillis()); + } else { + addAction = String.format( + "{\"add\":{\"path\":\"%s\",\"partitionValues\":{},\"size\":%d,\"modificationTime\":%d,\"dataChange\":true}}", + relativePath, metadata.sizeBytes(), metadata.lastModifiedMillis()); + } + writer.println(addAction); + } + } + } + + private void readAndVerify(String tablePath, int numRecords, boolean isPartitioned, String scenarioName) { + PCollection deltaRows = + pipelineRead.apply("Read from Delta", DeltaIO.readRows().from(tablePath)); + + PCollection monitoredRows = + deltaRows.apply("TimeMonitor", ParDo.of(new TimeMonitor<>(NAMESPACE, scenarioName + "_read"))); + + PCollection namesAndIds; + if (isPartitioned) { + namesAndIds = monitoredRows.apply("Convert to TestRow", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + org.apache.beam.sdk.values.Row r = c.element(); + int id = r.getInt32("id"); + String name = r.getString("name"); + String part = r.getString("part"); + String expectedPart = (id % 2 == 0) ? "even" : "odd"; + org.junit.Assert.assertEquals(expectedPart, part); + c.output(TestRow.create(id, name)); + } + })); + } else { + namesAndIds = monitoredRows.apply("Convert to TestRow", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + org.apache.beam.sdk.values.Row r = c.element(); + c.output(TestRow.create(r.getInt32("id"), r.getString("name"))); + } + })); + } + + PCollection consolidatedHashcode = + namesAndIds + .apply(ParDo.of(new TestRow.SelectNameFn())) + .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); + + PAssert.that(consolidatedHashcode) + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numRecords)); + + PipelineResult result = pipelineRead.run(); + PipelineResult.State pipelineState = result.waitUntilFinish(); + assertNotEquals(PipelineResult.State.FAILED, pipelineState); + + collectAndPublishMetrics(result, scenarioName + "_read", numRecords); + } + + private void collectAndPublishMetrics(PipelineResult readResult, String metricName, int records) { + if (settings == null) { + return; + } + String uuid = UUID.randomUUID().toString(); + String timestamp = java.time.Instant.now().toString(); + + Set> suppliers = new HashSet<>(); + suppliers.add( + reader -> { + long start = reader.getStartTimeMetric(metricName); + long end = reader.getEndTimeMetric(metricName); + double duration = (end - start) / 1e3; + return NamedTestResult.create(uuid, timestamp, metricName + "_duration_sec", duration); + }); + suppliers.add( + reader -> { + long start = reader.getStartTimeMetric(metricName); + long end = reader.getEndTimeMetric(metricName); + double duration = (end - start) / 1e3; + double throughput = duration > 0 ? records / duration : 0.0; + return NamedTestResult.create(uuid, timestamp, metricName + "_throughput_ops_sec", throughput); + }); + + IOITMetrics metrics = new IOITMetrics(suppliers, readResult, NAMESPACE, uuid, timestamp); + metrics.publishToInflux(settings); + } + + private void cleanUp(String tablePath) { + try { + MatchResult matchResult = FileSystems.match(tablePath + "/**"); + List resourceIds = new ArrayList<>(); + for (MatchResult.Metadata metadata : matchResult.metadata()) { + resourceIds.add(metadata.resourceId()); + } + if (!resourceIds.isEmpty()) { + FileSystems.delete(resourceIds); + } + FileSystems.delete(Collections.singletonList(FileSystems.matchNewResource(tablePath, true))); + } catch (Exception e) { + // Ignore cleanup failures in test + } + } +} diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTestPipelineOptions.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTestPipelineOptions.java new file mode 100644 index 000000000000..b28d4fe03f08 --- /dev/null +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTestPipelineOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Validation; + +/** Pipeline options for DeltaIO integration and performance tests. */ +public interface DeltaIOTestPipelineOptions extends IOTestPipelineOptions { + + @Description("Folder path where the Delta table will be created") + @Validation.Required + String getTablePath(); + + void setTablePath(String value); +}