Skip to content

[SPARK-54055][CONNECT][PYSPARK] Clean up per-session PythonWorkerFactory#55131

Open
kumbham wants to merge 1 commit intoapache:masterfrom
kumbham:skumbham/SPARK-54055-fix-python-worker-leak
Open

[SPARK-54055][CONNECT][PYSPARK] Clean up per-session PythonWorkerFactory#55131
kumbham wants to merge 1 commit intoapache:masterfrom
kumbham:skumbham/SPARK-54055-fix-python-worker-leak

Conversation

@kumbham
Copy link
Copy Markdown

@kumbham kumbham commented Apr 1, 2026

What changes were proposed in this pull request?

Each Spark Connect session creates its own PythonWorkerFactory keyed by SPARK_JOB_ARTIFACT_UUID. These factories (and their daemon processes) were never cleaned up until SparkContext shutdown, causing unbounded process and thread leaks on long-running servers.

This change adds two cleanup mechanisms:

  1. Eager cleanup (driver-side): SessionHolder.close() now calls SparkSession.cleanupPythonWorkers(), which finds and stops all PythonWorkerFactory instances matching the session's artifact UUID in SparkEnv's cache. This follows the same pattern as the existing cleanupPythonWorkerLogs() in the same lifecycle hook.

  2. Idle-timeout eviction (executor-side safety net): A ScheduledExecutorService in SparkEnv periodically scans for PythonWorkerFactory instances with a non-default SPARK_JOB_ARTIFACT_UUID that have no active/idle workers and have been idle for >5 minutes, and evicts them. This handles executor-side cleanup where session close notifications from the driver cannot reach. The scheduler follows the same pattern used by ContextCleaner, Heartbeater, and other Spark core components.

Factories with a default artifact UUID (i.e., non-Connect workloads) are never evicted by the idle-timeout mechanism.

Closes SPARK-54055

Why are the changes needed?

With Spark Connect, each session always has a unique SPARK_JOB_ARTIFACT_UUID, even if there are no artifacts. This makes the UDF environment built by BasePythonRunner.compute unique per session, so each session gets its own PythonWorkerFactory and daemon process. PythonWorkerFactory has a stop method, but no one called it except SparkEnv.stop (which only runs at full shutdown). On a long-running Spark Connect server, this causes unbounded accumulation of daemon processes, MonitorThreads, and stderr/stdout reader threads — eventually leading to OOM.

Reproduction (from JIRA reporter):

with:
```python
from pyspark.sql import SparkSession

def _udf(iterator):
    yield from iterator

spark = SparkSession.builder.remote("sc://...").getOrCreate()
df = spark.range(128)
df.mapInPandas(_udf, df.schema).count()

After 200 sessions, 200+ daemon processes and ~1000 threads are leaked.

Does this PR introduce any user-facing change?

No. This is a resource leak fix. Python UDF behavior is unchanged.

How was this patch tested?

Added 4 new unit tests in PythonWorkerFactoryIdleSuite:

isIdleFactory returns false for default artifact UUID — factories without a session UUID are never evicted
isIdleFactory returns false for session factory with recent activity — active factories are not evicted
isIdleFactory returns true for session factory past timeout — idle session factories are correctly identified
destroyPythonWorkersByArtifactUUID removes only matching factories — validates selective cleanup by UUID
Also verified no regressions in existing test suites:

PythonWorkerFactorySuite (3/3 passed)
SparkConnectSessionHolderSuite (18/18 passed)
SparkConnectSessionManagerSuite (10/10 passed)
All api.python core tests (14/14 passed)

Was this patch authored or co-authored using generative AI tooling?

Cursor (Claude claude-4.6-opus-high-thinking)

@kumbham kumbham changed the title [SPARK-54055][CONNECT][PYSPARK] Clean up per-session PythonWorkerFact… [SPARK-54055][CONNECT][PYSPARK] Clean up per-session PythonWorkerFactory Apr 1, 2026
@kumbham kumbham force-pushed the skumbham/SPARK-54055-fix-python-worker-leak branch from 16b9db1 to 8d4843c Compare April 1, 2026 17:05
@kumbham
Copy link
Copy Markdown
Author

kumbham commented Apr 1, 2026

all of the failed tests are failing with this exception:
org.scalatest.exceptions.TestFailedDueToTimeoutExceptionThese tests deploy Spark pods onto a Kubernetes cluster. They time out because my fork's GitHub Actions runner has no K8s cluster configured.

https://github.com/kumbham/spark/actions/runs/23860864341/job/69567033253#step:11:8167

None of the failing tests are related to the changes in this PR (PythonWorkerFactory,
SparkEnv, SparkSession, SessionHolder).

…ory on session close

Each Spark Connect session creates its own PythonWorkerFactory keyed by
SPARK_JOB_ARTIFACT_UUID. These factories (and their daemon processes)
were never cleaned up until SparkContext shutdown, causing unbounded
process and thread leaks on long-running servers.

This change adds two cleanup mechanisms:

1. Eager cleanup: SessionHolder.close() now calls
   SparkSession.cleanupPythonWorkers() which removes and stops all
   PythonWorkerFactory instances matching the session's artifact UUID
   from SparkEnv's cache.

2. Idle-timeout eviction: A background reaper thread in SparkEnv
   periodically scans for PythonWorkerFactory instances with a
   non-default artifact UUID that have been idle for >5 minutes,
   and evicts them. This handles executor-side cleanup where session
   close notifications are not received.

Closes #XXXXX

Made-with: Cursor
@kumbham kumbham force-pushed the skumbham/SPARK-54055-fix-python-worker-leak branch from 8d4843c to 8ce7463 Compare April 1, 2026 23:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants