Skip to content

Add Databricks Serverless Compute Support#3392

Merged
machichima merged 4 commits intoflyteorg:masterfrom
rohitrsh:feat/databricks-serverless-support
Mar 10, 2026
Merged

Add Databricks Serverless Compute Support#3392
machichima merged 4 commits intoflyteorg:masterfrom
rohitrsh:feat/databricks-serverless-support

Conversation

@rohitrsh
Copy link
Contributor

@rohitrsh rohitrsh commented Feb 17, 2026

Tracking issue

flyteorg/flyte#6911

Why are the changes needed?

Databricks Serverless Compute offers faster startup times (seconds vs. minutes), automatic scaling, and zero infrastructure management. However, the existing flytekit-spark connector only supports classic compute (clusters). This PR enables teams to use serverless without changing their task code.

Users switch between classic and serverless by changing only the databricks_conf task code stays identical:

import flytekit
from flytekit import task
from flytekitplugins.spark import DatabricksV2

# Classic compute
@task(task_config=DatabricksV2(
    databricks_conf={
        "new_cluster": {
            "spark_version": "15.4.x-scala2.12",
            "node_type_id": "m5.xlarge",
            "num_workers": 2,
        },
    },
    databricks_instance="my-workspace.cloud.databricks.com",
))
def classic_task() -> float:
    spark = flytekit.current_context().spark_session
    return spark.range(100).count()

# Serverless compute same task code, different config
@task(task_config=DatabricksV2(
    databricks_conf={
        "environment_key": "default",
        "environments": [{
            "environment_key": "default",
            "spec": {"client": "1"},
        }],
    },
    databricks_instance="my-workspace.cloud.databricks.com",
    databricks_service_credential_provider="my-s3-credential",
))
def serverless_task() -> float:
    spark = flytekit.current_context().spark_session  # same API
    return spark.range(100).count()

What changes were proposed in this pull request?

Adds first-class support for running Flyte Spark tasks on Databricks Serverless Compute, alongside the existing classic compute (clusters) support.

  • Auto-detect serverless vs. classic based on databricks_conf contents (no new task type needed)
  • Generate correct Databricks Jobs API payload for serverless (multi-task format with environments array)
  • SparkSession available via flytekit.current_context().spark_session for both compute modes
  • AWS credential forwarding via Databricks Service Credentials for S3 access in serverless
  • Notebook task support for both classic and serverless compute
  • Default entrypoint from flytetools same pattern as classic, no user configuration needed

Files modified

File Description
flytekitplugins/spark/connector.py Serverless detection, multi-task job format, env injection, credential forwarding, entrypoint resolution, notebook tasks
flytekitplugins/spark/task.py Serverless SparkSession retrieval in pre_execute(), DatabricksV2 config additions (credential provider, notebook support), docstring
tests/test_connector.py 11 new tests for serverless detection, configuration, job spec generation, entrypoint defaults
tests/test_spark_task.py Tests for serverless detection, SparkSession retrieval, credential provider, notebook config

No new files in the plugin. The serverless entrypoint (entrypoint_serverless.py) lives in the flytetools repository same pattern as the classic entrypoint.

Technical details

1. Auto-detection of compute mode

New function _is_serverless_config() detects serverless based on databricks_conf keys:

Keys Present Compute Mode
existing_cluster_id Classic (existing cluster)
new_cluster Classic (new cluster)
environment_key or environments (no cluster keys) Serverless
None of the above Error

2. Serverless job spec format

Databricks Serverless requires a different Jobs API payload (multi-task format with tasks array and environments array). New function _configure_serverless() handles the environments array creation and env var injection.

3. Entrypoint resolution

Both classic and serverless default to the same flytetools repository. Only the python_file path differs:

default_classic_python_file = "flytekitplugins/databricks/entrypoint.py"
default_serverless_python_file = "flytekitplugins/databricks/entrypoint_serverless.py"

Users can override both git_source and python_file via databricks_conf for custom entrypoints.

4. SparkSession in serverless

The serverless entrypoint (in flytetools) pre-creates the SparkSession and stores it in sys.modules and builtins. New method _get_databricks_serverless_spark_session() in task.py retrieves it and exposes it via flytekit.current_context().spark_session the same API as classic compute.

5. AWS credential provider

New DatabricksV2 config field: databricks_service_credential_provider. Resolution order: task config → connector env var (FLYTE_DATABRICKS_SERVICE_CREDENTIAL_PROVIDER).

6. Notebook task support

New DatabricksV2 config fields: notebook_path, notebook_base_parameters. Works with both classic and serverless compute.

Backward compatibility

Aspect Impact
Existing classic tasks No change detection is additive, classic path unchanged
Existing databricks_conf No change configs with new_cluster/existing_cluster_id work as before
API surface Additive only — new optional fields on DatabricksV2
flytetools entrypoint Classic entrypoint.py unchanged new file added alongside

How was this patch tested?

Unit tests (14 connector tests, all passing)

Existing tests (unchanged):

  • test_databricks_agent Classic compute full agent flow
  • test_agent_create_with_no_instance Missing instance error
  • test_agent_create_with_default_instance Instance from env var

New serverless tests:

  • test_is_serverless_config_detection 7 scenarios for compute mode detection
  • test_configure_serverless_with_env_key_only Auto-creates environments array
  • test_configure_serverless_with_inline_env Preserves user's environment spec
  • test_configure_serverless_creates_default_env Default env when none specified
  • test_get_databricks_job_spec_serverless_with_env_key Full spec for env_key config
  • test_get_databricks_job_spec_serverless_with_inline_env Full spec for inline env config
  • test_get_databricks_job_spec_error_no_compute Error when no compute config
  • test_databricks_agent_serverless — Full agent create/get flow for serverless
  • test_serverless_default_entrypoint_from_flytetools Default flytetools entrypoint
  • test_serverless_task_git_source_overrides_default Task-level override works
  • test_classic_and_serverless_use_same_repo Same flytetools repo, different python_file

Task tests (test_spark_task.py):

  • Serverless environment detection
  • SparkSession retrieval from sys.modules and builtins
  • DatabricksV2 credential provider and notebook configuration

Manual testing

  • Classic compute tasks continue to work (no regression)
  • Serverless compute with pre-configured environment_key
  • Serverless compute with inline environments spec
  • AWS credentials from Databricks service credentials
  • SparkSession available via flytekit.current_context().spark_session
  • Complex Spark workloads (DataFrame operations, UDFs, aggregations)

Setup process

No additional setup needed. Run tests with:

pytest plugins/flytekit-spark/tests/test_connector.py -v
pytest plugins/flytekit-spark/tests/test_spark_task.py -v

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
@codecov
Copy link

codecov bot commented Feb 26, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.09%. Comparing base (ba7dc6f) to head (010c0bb).
⚠️ Report is 5 commits behind head on master.

❗ There is a different number of reports uploaded between BASE (ba7dc6f) and HEAD (010c0bb). Click for more details.

HEAD has 48 uploads less than BASE
Flag BASE (ba7dc6f) HEAD (010c0bb)
50 2
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3392      +/-   ##
==========================================
- Coverage   82.87%   73.09%   -9.78%     
==========================================
  Files         356      216     -140     
  Lines       29579    22779    -6800     
  Branches     2997     2997              
==========================================
- Hits        24513    16651    -7862     
- Misses       4218     5298    +1080     
+ Partials      848      830      -18     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Member

@machichima machichima left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick pass for connector.py file

# Determine compute mode and configure accordingly
is_serverless = _is_serverless_config(databricks_job)

if notebook_path:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that notebook do not need to handle default_git_source so that we will go through different code path. Could we extract logic for dealing with notebook and not notebook into functions to make it cleaner? For example:

if notebook_path:
    return _build_notebook_job_spec(databricks_job, ...)
else:
    return _build_python_file_job_spec(databricks_job, ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, extracted into two functions:

  • _build_notebook_job_spec() handles notebook task logic for both classic and serverless
  • _build_python_file_job_spec() handles spark_python_task logic for both classic and serverless

_get_databricks_job_spec() is now a clean dispatcher:

if custom.get("notebookPath"):
    return _build_notebook_job_spec(...)
return _build_python_file_job_spec(...)

task_def = {
"task_key": "flyte_notebook_task",
"notebook_task": notebook_task,
"environment_key": environment_key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on API 2.1 docs: https://docs.databricks.com/api/workspace/jobs_21/submit
I think the environment_key is not in the task def? We already put it under databricks_job["environments"] in _configure_serverless.

In this case I think _configure_serverless do not need to return anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this. I checked the Databricks Jobs API 2.1 submit docs environment_key in the task definition is actually correct and required. There are two separate usages:

  1. In the environments array (job level) defines the environment with its spec
  2. In the task definition references to the environment the task should use

This is analogous to how job_cluster_key in a task references a shared job cluster defined in the job_clusters array. So _configure_serverless needs to return the key so we can place it in the task definition. I've added a clarifying comment to the docstring that explains this relationship.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I was only looking at the "Request samples" in the docs and thought that they include all fields in there, environment_key is indeed in the task definition. Thank you for the clarification!


Databricks serverless requires the 'environments' array to be defined in the job
submission. This function ensures the environments array exists and injects
Flyte environment variables.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help me add:

  1. The docs link to the API docs
  2. What is the expected configuration format here, e.g.
{
    "environments": {
        "environment_key": ....,
        "spec": .....
    }
}

So that it can be easier to maintain in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Updated the _configure_serverless docstring to include:

  1. API docs link: https://docs.databricks.com/api/workspace/jobs/submit
  2. The expected environments format showing the full structure (environment_key, spec, client, dependencies, environment_vars)
  3. Explanation of how environment_key in a task references which environment to use

Comment on lines +106 to +109
if new_cluster is None:
raise ValueError(
"Either existing_cluster_id, new_cluster, environment_key, or environments must be specified"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better for us to add a validation function in the beginning of _get_databricks_job_spec() and raise ValueError there with this message. Otherwise we will need to navigate to other function like _is_serverless_config to understand why "environment_key, environments" is here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, moved the compute validation to the beginning of _get_databricks_job_spec() as a dedicated early check. It now fails fast with a clear error message before any processing begins. The old ValueError inside _configure_classic_cluster() has been removed since validation is now handled upfront.

- Extract _build_notebook_job_spec() and _build_python_file_job_spec()
- Move compute validation to beginning of _get_databricks_job_spec()
- Add API docs link and config format to _configure_serverless docstring
- Create shared utils.py with is_serverless_config() for reuse in task.py

Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
Copy link
Member

@machichima machichima left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM! Just two comments

}

return databricks_job
has_cluster = "existing_cluster_id" in databricks_job or "new_cluster" in databricks_job
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we switch this validation to value-based checks (e.g. databricks_job.get(...)) so existing_cluster_id=None / new_cluster=None don’t pass as valid compute config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, switched to value-based checks using .get() so that existing_cluster_id=None / new_cluster=None are correctly treated as absent. Applied consistently in both connector.pyvalidation and the sharedutils.py` detection function.

if isinstance(self.task_config, DatabricksV2):
conf = self.task_config.databricks_conf or {}
if is_serverless_config(conf):
return True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, I think we also need to ensure is_databricks is True before returning? Could we set a is_serverless_cfg flag here to True, and combine with the following return logic to consider all "DATABRICKS_RUNTIME_VERSION", "SPARK_HOME" and serverless config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, the serverless config check now sets an is_serverless_cfg flag and combines it with the is_databricks check: return is_databricks and (is_serverless_cfg or "SPARK_HOME" not in os.environ). This ensures we only detect serverless when actually running on Databricks. Updated tests accordingly.

@machichima
Copy link
Member

Hi @rohitrsh ,
Could you fix the lint error reported in the CI? You can run make lint locally.
Thank you!

…lag, lint fixes

- Switch validation to value-based .get() checks so None values are rejected
- Combine serverless config flag with is_databricks check in task.py
- Fix ruff, ruff-format, and pydoclint violations
- Update tests for new serverless detection behavior
- Update pydoclint-errors-baseline.txt (fixed DatabricksV2 docstring)

Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
@rohitrsh
Copy link
Contributor Author

rohitrsh commented Mar 5, 2026

Hi @rohitrsh , Could you fix the lint error reported in the CI? You can run make lint locally. Thank you!

Hey, @machichima Fixed all lint issues: ruff, ruff-format, pydoclint violations. Updated pydoclint-errors-baseline.txt (4 lines removed the DatabricksV2 DOC601/DOC603 violations are now resolved by proper Attributes: docstring).

% make lint
mypy flytekit/core
Success: no issues found in 52 source files
mypy flytekit/types
Success: no issues found in 23 source files
mypy --allow-empty-bodies --disable-error-code="annotation-unchecked" tests/flytekit/unit/core
Success: no issues found in 102 source files
pre-commit run --all-files
ruff.....................................................................Passed
ruff-format..............................................................Passed
check yaml...............................................................Passed
fix end of files.........................................................Passed
trim trailing whitespace.................................................Passed
shellcheck...............................................................Passed
Check for exposed PDB statements.........................................Passed
codespell................................................................Passed
pydoclint................................................................Passed

Copy link
Member

@machichima machichima left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@machichima machichima merged commit a735a62 into flyteorg:master Mar 10, 2026
56 checks passed
@welcome
Copy link

welcome bot commented Mar 10, 2026

Congrats on merging your first pull request! 🎉

rohitrsh added a commit to rohitrsh/flytekit that referenced this pull request Mar 10, 2026
Samhita's GitHub merge dropped all serverless compute changes from
flyteorg#3392. Re-apply serverless changes from master and combine with
token auth from this branch.
Files re-merged:
- connector.py: serverless functions + token auth (create/get/delete)
- task.py: DatabricksV2 serverless fields + databricks_token_secret
- utils.py: shared is_serverless_config (was missing entirely)
- test_connector.py: all 14 serverless tests + auth_token in metadata
- test_spark_task.py: serverless detection + SparkSession retrieval tests
All 60 tests pass (14 connector + 15 task + 31 token).
Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
rohitrsh added a commit to rohitrsh/flytekit that referenced this pull request Mar 10, 2026
Resolve merge conflicts in task.py after flyteorg#3392 (serverless) was merged:
- Combined DatabricksV2 attributes: kept all serverless fields, added
  databricks_token_secret
- Combined get_custom() serialization for both feature sets
- Added auth_token to serverless test metadata assertion
- Removed emoji from error message

Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
Made-with: Cursor
rohitrsh added a commit to rohitrsh/flytekit that referenced this pull request Mar 10, 2026
Resolve merge conflicts in task.py after flyteorg#3392 (serverless) was merged:
- Combined DatabricksV2 attributes: kept all serverless fields, added
  databricks_token_secret
- Combined get_custom() serialization for both feature sets
- Added auth_token to serverless test metadata assertion
- Removed emoji from error message

Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
Made-with: Cursor
rohitrsh added a commit to rohitrsh/flytekit that referenced this pull request Mar 10, 2026
Resolve merge conflicts in task.py after flyteorg#3392 (serverless) was merged:
- Combined DatabricksV2 attributes: kept all serverless fields, added
  databricks_token_secret
- Combined get_custom() serialization for both feature sets
- Added auth_token to serverless test metadata assertion
- Removed emoji from error message

Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
Made-with: Cursor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants