Add Databricks Serverless Compute Support#3392
Conversation
Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
Codecov Report✅ All modified and coverable lines are covered by tests.
Additional details and impacted files@@ Coverage Diff @@
## master #3392 +/- ##
==========================================
- Coverage 82.87% 73.09% -9.78%
==========================================
Files 356 216 -140
Lines 29579 22779 -6800
Branches 2997 2997
==========================================
- Hits 24513 16651 -7862
- Misses 4218 5298 +1080
+ Partials 848 830 -18 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
machichima
left a comment
There was a problem hiding this comment.
Did a quick pass for connector.py file
| # Determine compute mode and configure accordingly | ||
| is_serverless = _is_serverless_config(databricks_job) | ||
|
|
||
| if notebook_path: |
There was a problem hiding this comment.
I understand that notebook do not need to handle default_git_source so that we will go through different code path. Could we extract logic for dealing with notebook and not notebook into functions to make it cleaner? For example:
if notebook_path:
return _build_notebook_job_spec(databricks_job, ...)
else:
return _build_python_file_job_spec(databricks_job, ...)There was a problem hiding this comment.
Done, extracted into two functions:
_build_notebook_job_spec()handles notebook task logic for both classic and serverless_build_python_file_job_spec()handlesspark_python_tasklogic for both classic and serverless
_get_databricks_job_spec() is now a clean dispatcher:
if custom.get("notebookPath"):
return _build_notebook_job_spec(...)
return _build_python_file_job_spec(...)| task_def = { | ||
| "task_key": "flyte_notebook_task", | ||
| "notebook_task": notebook_task, | ||
| "environment_key": environment_key, |
There was a problem hiding this comment.
Based on API 2.1 docs: https://docs.databricks.com/api/workspace/jobs_21/submit
I think the environment_key is not in the task def? We already put it under databricks_job["environments"] in _configure_serverless.
In this case I think _configure_serverless do not need to return anything?
There was a problem hiding this comment.
Thanks for flagging this. I checked the Databricks Jobs API 2.1 submit docs environment_key in the task definition is actually correct and required. There are two separate usages:
- In the environments array (job level) defines the environment with its spec
- In the task definition references to the environment the task should use
This is analogous to how job_cluster_key in a task references a shared job cluster defined in the job_clusters array. So _configure_serverless needs to return the key so we can place it in the task definition. I've added a clarifying comment to the docstring that explains this relationship.
There was a problem hiding this comment.
Oh! I was only looking at the "Request samples" in the docs and thought that they include all fields in there, environment_key is indeed in the task definition. Thank you for the clarification!
|
|
||
| Databricks serverless requires the 'environments' array to be defined in the job | ||
| submission. This function ensures the environments array exists and injects | ||
| Flyte environment variables. |
There was a problem hiding this comment.
Could you please help me add:
- The docs link to the API docs
- What is the expected configuration format here, e.g.
{
"environments": {
"environment_key": ....,
"spec": .....
}
}So that it can be easier to maintain in the future
There was a problem hiding this comment.
Done. Updated the _configure_serverless docstring to include:
- API docs link: https://docs.databricks.com/api/workspace/jobs/submit
- The expected environments format showing the full structure (
environment_key,spec,client,dependencies,environment_vars) - Explanation of how
environment_keyin a task references which environment to use
| if new_cluster is None: | ||
| raise ValueError( | ||
| "Either existing_cluster_id, new_cluster, environment_key, or environments must be specified" | ||
| ) |
There was a problem hiding this comment.
I think it would be better for us to add a validation function in the beginning of _get_databricks_job_spec() and raise ValueError there with this message. Otherwise we will need to navigate to other function like _is_serverless_config to understand why "environment_key, environments" is here
There was a problem hiding this comment.
Done, moved the compute validation to the beginning of _get_databricks_job_spec() as a dedicated early check. It now fails fast with a clear error message before any processing begins. The old ValueError inside _configure_classic_cluster() has been removed since validation is now handled upfront.
- Extract _build_notebook_job_spec() and _build_python_file_job_spec() - Move compute validation to beginning of _get_databricks_job_spec() - Add API docs link and config format to _configure_serverless docstring - Create shared utils.py with is_serverless_config() for reuse in task.py Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
machichima
left a comment
There was a problem hiding this comment.
Overall LGTM! Just two comments
| } | ||
|
|
||
| return databricks_job | ||
| has_cluster = "existing_cluster_id" in databricks_job or "new_cluster" in databricks_job |
There was a problem hiding this comment.
Could we switch this validation to value-based checks (e.g. databricks_job.get(...)) so existing_cluster_id=None / new_cluster=None don’t pass as valid compute config?
There was a problem hiding this comment.
Done, switched to value-based checks using .get() so that existing_cluster_id=None / new_cluster=None are correctly treated as absent. Applied consistently in both connector.pyvalidation and the sharedutils.py` detection function.
| if isinstance(self.task_config, DatabricksV2): | ||
| conf = self.task_config.databricks_conf or {} | ||
| if is_serverless_config(conf): | ||
| return True |
There was a problem hiding this comment.
For this, I think we also need to ensure is_databricks is True before returning? Could we set a is_serverless_cfg flag here to True, and combine with the following return logic to consider all "DATABRICKS_RUNTIME_VERSION", "SPARK_HOME" and serverless config?
There was a problem hiding this comment.
Done, the serverless config check now sets an is_serverless_cfg flag and combines it with the is_databricks check: return is_databricks and (is_serverless_cfg or "SPARK_HOME" not in os.environ). This ensures we only detect serverless when actually running on Databricks. Updated tests accordingly.
|
Hi @rohitrsh , |
…lag, lint fixes - Switch validation to value-based .get() checks so None values are rejected - Combine serverless config flag with is_databricks check in task.py - Fix ruff, ruff-format, and pydoclint violations - Update tests for new serverless detection behavior - Update pydoclint-errors-baseline.txt (fixed DatabricksV2 docstring) Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
Hey, @machichima Fixed all lint issues: ruff, ruff-format, pydoclint violations. Updated |
|
Congrats on merging your first pull request! 🎉 |
Samhita's GitHub merge dropped all serverless compute changes from flyteorg#3392. Re-apply serverless changes from master and combine with token auth from this branch. Files re-merged: - connector.py: serverless functions + token auth (create/get/delete) - task.py: DatabricksV2 serverless fields + databricks_token_secret - utils.py: shared is_serverless_config (was missing entirely) - test_connector.py: all 14 serverless tests + auth_token in metadata - test_spark_task.py: serverless detection + SparkSession retrieval tests All 60 tests pass (14 connector + 15 task + 31 token). Signed-off-by: Rohit Sharma <rohitrsh@gmail.com>
Resolve merge conflicts in task.py after flyteorg#3392 (serverless) was merged: - Combined DatabricksV2 attributes: kept all serverless fields, added databricks_token_secret - Combined get_custom() serialization for both feature sets - Added auth_token to serverless test metadata assertion - Removed emoji from error message Signed-off-by: Rohit Sharma <rohitrsh@gmail.com> Made-with: Cursor
Resolve merge conflicts in task.py after flyteorg#3392 (serverless) was merged: - Combined DatabricksV2 attributes: kept all serverless fields, added databricks_token_secret - Combined get_custom() serialization for both feature sets - Added auth_token to serverless test metadata assertion - Removed emoji from error message Signed-off-by: Rohit Sharma <rohitrsh@gmail.com> Made-with: Cursor
Resolve merge conflicts in task.py after flyteorg#3392 (serverless) was merged: - Combined DatabricksV2 attributes: kept all serverless fields, added databricks_token_secret - Combined get_custom() serialization for both feature sets - Added auth_token to serverless test metadata assertion - Removed emoji from error message Signed-off-by: Rohit Sharma <rohitrsh@gmail.com> Made-with: Cursor
Tracking issue
flyteorg/flyte#6911
Why are the changes needed?
Databricks Serverless Compute offers faster startup times (seconds vs. minutes), automatic scaling, and zero infrastructure management. However, the existing flytekit-spark connector only supports classic compute (clusters). This PR enables teams to use serverless without changing their task code.
Users switch between classic and serverless by changing only the
databricks_conftask code stays identical:What changes were proposed in this pull request?
Adds first-class support for running Flyte Spark tasks on Databricks Serverless Compute, alongside the existing classic compute (clusters) support.
databricks_confcontents (no new task type needed)environmentsarray)flytekit.current_context().spark_sessionfor both compute modesFiles modified
flytekitplugins/spark/connector.pyflytekitplugins/spark/task.pypre_execute(),DatabricksV2config additions (credential provider, notebook support), docstringtests/test_connector.pytests/test_spark_task.pyNo new files in the plugin. The serverless entrypoint (
entrypoint_serverless.py) lives in the flytetools repository same pattern as the classic entrypoint.Technical details
1. Auto-detection of compute mode
New function
_is_serverless_config()detects serverless based ondatabricks_confkeys:existing_cluster_idnew_clusterenvironment_keyorenvironments(no cluster keys)2. Serverless job spec format
Databricks Serverless requires a different Jobs API payload (multi-task format with
tasksarray andenvironmentsarray). New function_configure_serverless()handles the environments array creation and env var injection.3. Entrypoint resolution
Both classic and serverless default to the same
flytetoolsrepository. Only thepython_filepath differs:Users can override both
git_sourceandpython_fileviadatabricks_conffor custom entrypoints.4. SparkSession in serverless
The serverless entrypoint (in flytetools) pre-creates the SparkSession and stores it in
sys.modulesandbuiltins. New method_get_databricks_serverless_spark_session()intask.pyretrieves it and exposes it viaflytekit.current_context().spark_sessionthe same API as classic compute.5. AWS credential provider
New
DatabricksV2config field:databricks_service_credential_provider. Resolution order: task config → connector env var (FLYTE_DATABRICKS_SERVICE_CREDENTIAL_PROVIDER).6. Notebook task support
New
DatabricksV2config fields:notebook_path,notebook_base_parameters. Works with both classic and serverless compute.Backward compatibility
databricks_confnew_cluster/existing_cluster_idwork as beforeDatabricksV2entrypoint.pyunchanged new file added alongsideHow was this patch tested?
Unit tests (14 connector tests, all passing)
Existing tests (unchanged):
test_databricks_agentClassic compute full agent flowtest_agent_create_with_no_instanceMissing instance errortest_agent_create_with_default_instanceInstance from env varNew serverless tests:
test_is_serverless_config_detection7 scenarios for compute mode detectiontest_configure_serverless_with_env_key_onlyAuto-creates environments arraytest_configure_serverless_with_inline_envPreserves user's environment spectest_configure_serverless_creates_default_envDefault env when none specifiedtest_get_databricks_job_spec_serverless_with_env_keyFull spec for env_key configtest_get_databricks_job_spec_serverless_with_inline_envFull spec for inline env configtest_get_databricks_job_spec_error_no_computeError when no compute configtest_databricks_agent_serverless— Full agent create/get flow for serverlesstest_serverless_default_entrypoint_from_flytetoolsDefault flytetools entrypointtest_serverless_task_git_source_overrides_defaultTask-level override workstest_classic_and_serverless_use_same_repoSame flytetools repo, different python_fileTask tests (
test_spark_task.py):sys.modulesandbuiltinsDatabricksV2credential provider and notebook configurationManual testing
environment_keyenvironmentsspecflytekit.current_context().spark_sessionSetup process
No additional setup needed. Run tests with:
Check all the applicable boxes
Related PRs
Docs link