Skip to content

support batch/multi-import execution in ingestion Cloud Workflow#134

Merged
dwnoble merged 3 commits into
datacommonsorg:mainfrom
dwnoble:update-ingestion-workflow
Jun 23, 2026
Merged

support batch/multi-import execution in ingestion Cloud Workflow#134
dwnoble merged 3 commits into
datacommonsorg:mainfrom
dwnoble:update-ingestion-workflow

Conversation

@dwnoble

@dwnoble dwnoble commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Submit with this PR: datacommonsorg/import#563

  • Decoded Batch Imports: Added JSON decoding of input.importList in the Cloud Workflow to process batch import executions.
  • Bulk Payload Construction: Added a loop (build_lists) to construct staging paths (imports_status_list), version history (imports_history_list), and update payload lists (imports_version_list) dynamically.
  • Refactored Ingestion Helper Integration: Updated HTTP steps for status updating (/imports/status), version promotion (/imports/version), and history logging (/imports/ingestion-status) to submit the batch payloads.
  • Dynamic Dataflow Job Naming: Suffixes multi-import Dataflow job executions with -multi to distinguish batch runs from single imports in console logs.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the ingestion workflow in infra/dcp/modules/ingestion/workflow/main.tf to handle a list of imports (input.importList) instead of a single import, building lists for status, history, and versions. However, the reviewer points out that the ingestion workflow currently only supports a single import, and recommends using input.importName directly to avoid unnecessary complexity and nested template directives inside the workflow definition.

- sanitized_import: '$${text.replace_all(text.replace_all(text.to_lower(input.importName), "/", "-"), "_", "-")}'
- decoded_imports: '$${json.decode(input.importList)}'
- num_imports: '$${len(decoded_imports)}'
- first_import_name: '$${decoded_imports[0].importName}'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The ingestion workflow currently supports only a single import. Please use input.importName directly instead of attempting to decode and access a list of imports (e.g., decoded_imports[0]). This also avoids complex nested template directives or conditional logic inside the workflow definition.

            - first_import_name: '$$${input.importName}'
References
  1. The ingestion workflow currently supports only a single import. Ensure that any changes are consistent with this limitation, using input.importName rather than input.importList.
  2. Avoid complex nested template directives or conditional logic inside resource definitions (such as inline scripts or workflow definitions). Instead, simplify the code by extracting the conditional logic into Terraform local variables (locals).

@gmechali gmechali left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for handling this!

- decoded_imports: '$${json.decode(input.importList)}'
- num_imports: '$${len(decoded_imports)}'
- first_import_name: '$${decoded_imports[0].importName}'
- sanitized_import: '$${text.replace_all(text.replace_all(text.to_lower(first_import_name), "/", "-"), "_", "-")}'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was using the importName because at the time we were only just processing that one properly.

I think the right approach here is to sanitize every import in decoded_imports. and then join them.

The sanitized import name goes to dataflow, for which the jobs have annoying name requirements. Could you modify this piece so that if importNames = ds1/ilo, ds2/who, UNICC we end up with:
ds1_ilo_ds2_who_unicc

(note we dont care about uniqueness here, it's just a way to make the dataflow job descriptive so they know what's running! )

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to give the import a legible name using dashes (at jetski's recommendation)

- substring_end: '$${if(import_name_len < 30, import_name_len, 30)}'
- sanitized_short_import: '$${text.substring(sanitized_import, 0, substring_end)}'
- dataflow_job_name: '$${"${local.clean_namespace_prefix}" + sanitized_short_import + "-" + string(int(sys.now()))}'
- suffix: '$${if(num_imports > 1, "-multi", "")}'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you confifm you ran this on sample imports with multi-provenances?

This workflow was already getting really complicated. I think we need to take a TODO to clean this up, but it doesnt have to be for current milestone :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to successfully run the workflow with a frog dataset containing two imports/provenances, and a UN data load with two provenances.

@dwnoble dwnoble added this pull request to the merge queue Jun 23, 2026
Merged via the queue into datacommonsorg:main with commit ac09b39 Jun 23, 2026
8 checks passed
dwnoble added a commit to datacommonsorg/import that referenced this pull request Jun 23, 2026
… of multiple imports (#563)

This enables the DCP ingestion workflow to update multiple imports at
once

Submit with: datacommonsorg/datacommons#134

* API Payload Schema Change: Updated UpdateImportStatusRequest and
UpdateImportVersionRequest models to accept lists of imports under an
imports parameter instead of single-import fields.
* Service Endpoint Processing: Refactored the /imports/status and
/imports/version route handlers to iterate and update GCS file
version/provenance logs and Spanner status records for multiple imports
in a single call.
* Workflow & Helper Updates: Adjusted caller definitions in
import-automation-workflow.yaml and import_helper.py to match the new
list-based HTTP request structures.
* Test Coverage: Added new unit test scenarios in app_test.py
mock-validating the updated endpoints with lists of multiple imports.
* Documentation: Updated the API overview table in the ingestion-helper
README.md to document the list support.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants