diff --git a/.github/trigger_files/beam_PostCommit_Python_Versions.json b/.github/trigger_files/beam_PostCommit_Python_Versions.json index 9cc78c7d1c6c..8b2c8c445c1f 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Versions.json +++ b/.github/trigger_files/beam_PostCommit_Python_Versions.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 4 + "revision": 5 } diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index c89974cb6ea5..9357515f36c2 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -185,6 +185,7 @@ def sickbayTests = [ def createPrismValidatesRunnerTask = { name, environmentType -> Task vrTask = tasks.create(name: name, type: Test, group: "Verification") { description "PrismRunner Java $environmentType ValidatesRunner suite" + outputs.upToDateWhen { false } classpath = configurations.validatesRunner var prismBuildTask = dependsOn(':runners:prism:build') diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index f720be20e375..3949d1af3248 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -387,8 +387,11 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. em.pendingElements.Wait() slog.Debug("no more pending elements: terminating pipeline") cancelFn(fmt.Errorf("elementManager out of elements, cleaning up")) - // Ensure the watermark evaluation goroutine exits. + // Ensure the watermark evaluation goroutine exits by locking the mutex + // before broadcasting, preventing a lost wake-up signal. + em.refreshCond.L.Lock() em.refreshCond.Broadcast() + em.refreshCond.L.Unlock() }() // Watermark evaluation goroutine. go func() { @@ -2496,7 +2499,9 @@ func (em *ElementManager) wakeUpAt(t mtime.Time) { // only create this goroutine if we have real-time clock enabled (also implying the pipeline does not have TestStream). go func(fireAt time.Time) { time.AfterFunc(time.Until(fireAt), func() { + em.refreshCond.L.Lock() em.refreshCond.Broadcast() + em.refreshCond.L.Unlock() }) }(t.ToTime()) }