support batch/multi-import execution in ingestion Cloud Workflow#134
Conversation
There was a problem hiding this comment.
Code Review
This pull request updates the ingestion workflow in infra/dcp/modules/ingestion/workflow/main.tf to handle a list of imports (input.importList) instead of a single import, building lists for status, history, and versions. However, the reviewer points out that the ingestion workflow currently only supports a single import, and recommends using input.importName directly to avoid unnecessary complexity and nested template directives inside the workflow definition.
| - sanitized_import: '$${text.replace_all(text.replace_all(text.to_lower(input.importName), "/", "-"), "_", "-")}' | ||
| - decoded_imports: '$${json.decode(input.importList)}' | ||
| - num_imports: '$${len(decoded_imports)}' | ||
| - first_import_name: '$${decoded_imports[0].importName}' |
There was a problem hiding this comment.
The ingestion workflow currently supports only a single import. Please use input.importName directly instead of attempting to decode and access a list of imports (e.g., decoded_imports[0]). This also avoids complex nested template directives or conditional logic inside the workflow definition.
- first_import_name: '$$${input.importName}'
References
- The ingestion workflow currently supports only a single import. Ensure that any changes are consistent with this limitation, using
input.importNamerather thaninput.importList. - Avoid complex nested template directives or conditional logic inside resource definitions (such as inline scripts or workflow definitions). Instead, simplify the code by extracting the conditional logic into Terraform local variables (
locals).
gmechali
left a comment
There was a problem hiding this comment.
Thank you for handling this!
| - decoded_imports: '$${json.decode(input.importList)}' | ||
| - num_imports: '$${len(decoded_imports)}' | ||
| - first_import_name: '$${decoded_imports[0].importName}' | ||
| - sanitized_import: '$${text.replace_all(text.replace_all(text.to_lower(first_import_name), "/", "-"), "_", "-")}' |
There was a problem hiding this comment.
So this was using the importName because at the time we were only just processing that one properly.
I think the right approach here is to sanitize every import in decoded_imports. and then join them.
The sanitized import name goes to dataflow, for which the jobs have annoying name requirements. Could you modify this piece so that if importNames = ds1/ilo, ds2/who, UNICC we end up with:
ds1_ilo_ds2_who_unicc
(note we dont care about uniqueness here, it's just a way to make the dataflow job descriptive so they know what's running! )
There was a problem hiding this comment.
Updated to give the import a legible name using dashes (at jetski's recommendation)
| - substring_end: '$${if(import_name_len < 30, import_name_len, 30)}' | ||
| - sanitized_short_import: '$${text.substring(sanitized_import, 0, substring_end)}' | ||
| - dataflow_job_name: '$${"${local.clean_namespace_prefix}" + sanitized_short_import + "-" + string(int(sys.now()))}' | ||
| - suffix: '$${if(num_imports > 1, "-multi", "")}' |
There was a problem hiding this comment.
Could you confifm you ran this on sample imports with multi-provenances?
This workflow was already getting really complicated. I think we need to take a TODO to clean this up, but it doesnt have to be for current milestone :)
There was a problem hiding this comment.
I was able to successfully run the workflow with a frog dataset containing two imports/provenances, and a UN data load with two provenances.
…cts preserving isBaseDc
… of multiple imports (#563) This enables the DCP ingestion workflow to update multiple imports at once Submit with: datacommonsorg/datacommons#134 * API Payload Schema Change: Updated UpdateImportStatusRequest and UpdateImportVersionRequest models to accept lists of imports under an imports parameter instead of single-import fields. * Service Endpoint Processing: Refactored the /imports/status and /imports/version route handlers to iterate and update GCS file version/provenance logs and Spanner status records for multiple imports in a single call. * Workflow & Helper Updates: Adjusted caller definitions in import-automation-workflow.yaml and import_helper.py to match the new list-based HTTP request structures. * Test Coverage: Added new unit test scenarios in app_test.py mock-validating the updated endpoints with lists of multiple imports. * Documentation: Updated the API overview table in the ingestion-helper README.md to document the list support.
Submit with this PR: datacommonsorg/import#563