[SPARK-54055][CONNECT][PYSPARK] Clean up per-session PythonWorkerFactory#55131
Open
kumbham wants to merge 1 commit intoapache:masterfrom
Open
[SPARK-54055][CONNECT][PYSPARK] Clean up per-session PythonWorkerFactory#55131kumbham wants to merge 1 commit intoapache:masterfrom
kumbham wants to merge 1 commit intoapache:masterfrom
Conversation
16b9db1 to
8d4843c
Compare
Author
|
all of the failed tests are failing with this exception: https://github.com/kumbham/spark/actions/runs/23860864341/job/69567033253#step:11:8167 None of the failing tests are related to the changes in this PR (PythonWorkerFactory, |
…ory on session close Each Spark Connect session creates its own PythonWorkerFactory keyed by SPARK_JOB_ARTIFACT_UUID. These factories (and their daemon processes) were never cleaned up until SparkContext shutdown, causing unbounded process and thread leaks on long-running servers. This change adds two cleanup mechanisms: 1. Eager cleanup: SessionHolder.close() now calls SparkSession.cleanupPythonWorkers() which removes and stops all PythonWorkerFactory instances matching the session's artifact UUID from SparkEnv's cache. 2. Idle-timeout eviction: A background reaper thread in SparkEnv periodically scans for PythonWorkerFactory instances with a non-default artifact UUID that have been idle for >5 minutes, and evicts them. This handles executor-side cleanup where session close notifications are not received. Closes #XXXXX Made-with: Cursor
8d4843c to
8ce7463
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Each Spark Connect session creates its own
PythonWorkerFactorykeyed bySPARK_JOB_ARTIFACT_UUID. These factories (and their daemon processes) were never cleaned up untilSparkContextshutdown, causing unbounded process and thread leaks on long-running servers.This change adds two cleanup mechanisms:
Eager cleanup (driver-side):
SessionHolder.close()now callsSparkSession.cleanupPythonWorkers(), which finds and stops allPythonWorkerFactoryinstances matching the session's artifact UUID inSparkEnv's cache. This follows the same pattern as the existingcleanupPythonWorkerLogs()in the same lifecycle hook.Idle-timeout eviction (executor-side safety net): A
ScheduledExecutorServiceinSparkEnvperiodically scans forPythonWorkerFactoryinstances with a non-defaultSPARK_JOB_ARTIFACT_UUIDthat have no active/idle workers and have been idle for >5 minutes, and evicts them. This handles executor-side cleanup where session close notifications from the driver cannot reach. The scheduler follows the same pattern used byContextCleaner,Heartbeater, and other Spark core components.Factories with a default artifact UUID (i.e., non-Connect workloads) are never evicted by the idle-timeout mechanism.
Closes SPARK-54055
Why are the changes needed?
With Spark Connect, each session always has a unique
SPARK_JOB_ARTIFACT_UUID, even if there are no artifacts. This makes the UDF environment built byBasePythonRunner.computeunique per session, so each session gets its ownPythonWorkerFactoryand daemon process.PythonWorkerFactoryhas astopmethod, but no one called it exceptSparkEnv.stop(which only runs at full shutdown). On a long-running Spark Connect server, this causes unbounded accumulation of daemon processes,MonitorThreads, and stderr/stdout reader threads — eventually leading to OOM.Reproduction (from JIRA reporter):
After 200 sessions, 200+ daemon processes and ~1000 threads are leaked.
Does this PR introduce any user-facing change?
No. This is a resource leak fix. Python UDF behavior is unchanged.
How was this patch tested?
Added 4 new unit tests in PythonWorkerFactoryIdleSuite:
isIdleFactory returns false for default artifact UUID — factories without a session UUID are never evicted
isIdleFactory returns false for session factory with recent activity — active factories are not evicted
isIdleFactory returns true for session factory past timeout — idle session factories are correctly identified
destroyPythonWorkersByArtifactUUID removes only matching factories — validates selective cleanup by UUID
Also verified no regressions in existing test suites:
PythonWorkerFactorySuite (3/3 passed)
SparkConnectSessionHolderSuite (18/18 passed)
SparkConnectSessionManagerSuite (10/10 passed)
All api.python core tests (14/14 passed)
Was this patch authored or co-authored using generative AI tooling?
Cursor (Claude claude-4.6-opus-high-thinking)