From d64e3a3aad933b698beacf9b227195a7472c001e Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 24 Apr 2026 18:48:28 +0000 Subject: [PATCH 1/2] FileBasedSinkTest failure --- .../java/org/apache/beam/sdk/io/FileBasedSinkTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index c4f83954e66c..52d4786ee615 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.io.FileBasedSink.Writer; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; @@ -65,6 +66,7 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -79,6 +81,11 @@ public class FileBasedSinkTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Before + public void setUp() { + FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); + } + private final String tempDirectoryName = "temp"; private ResourceId getTemporaryFolder() { From ce88a25f8e6c26e39c90958b3306d211c7e3ea3f Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 24 Apr 2026 19:09:12 +0000 Subject: [PATCH 2/2] revert fileBasedSinkTest change and initialize source and sink if null --- .../org/apache/beam/sdk/metrics/Lineage.java | 21 +++++++++++++------ .../apache/beam/sdk/io/FileBasedSinkTest.java | 7 ------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 1a193ec006e0..dc30d82adcf4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.lineage.LineageOptions; import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.checkerframework.checker.nullness.qual.Nullable; @@ -122,16 +123,24 @@ private static Lineage createLineage(PipelineOptions options, LineageDirection d /** {@link Lineage} representing sources and optionally side inputs. */ public static Lineage getSources() { - return checkNotNull( - sources, - "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); + Lineage localSources = sources; + if (localSources == null) { + return createDefaultLineage(LineageDirection.SOURCE); + } + return localSources; } /** {@link Lineage} representing sinks. */ public static Lineage getSinks() { - return checkNotNull( - sinks, - "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); + Lineage localSinks = sinks; + if (localSinks == null) { + return createDefaultLineage(LineageDirection.SINK); + } + return localSinks; + } + + private static Lineage createDefaultLineage(LineageDirection direction) { + return createLineage(PipelineOptionsFactory.create(), direction); } @VisibleForTesting diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 52d4786ee615..c4f83954e66c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -57,7 +57,6 @@ import org.apache.beam.sdk.io.FileBasedSink.Writer; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; @@ -66,7 +65,6 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; import org.apache.commons.lang3.SystemUtils; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -81,11 +79,6 @@ public class FileBasedSinkTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Before - public void setUp() { - FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); - } - private final String tempDirectoryName = "temp"; private ResourceId getTemporaryFolder() {