Add AssetAndTimeSchedule timetable#58543
Conversation
c752a20 to
760844c
Compare
AssetAndTimeScheduleAssetAndTimeSchedule
|
Copilot's feedback are helpful, I've fixed both issues.
I've updated the link in the PR description to a permalink.
I think that would require significant changes across the API and frontend. I'd prefer to keep this PR focused on the core functionality and address the UI improvements in a follow-up if needed. |
d5612a8 to
a94354b
Compare
7c8452e to
0d9487f
Compare
Lee-W
left a comment
There was a problem hiding this comment.
one last question, but mostly good
|
It's not a feature I would ever use, but I'm good with it, and it seems there's an actual use case. Would like to know how @uranusjr thinks. Thanks! |
b590149 to
9e42573
Compare
…with asset conditions
Co-authored-by: Wei Lee <weilee.rx@gmail.com>
Co-authored-by: Wei Lee <weilee.rx@gmail.com>
Co-authored-by: Wei Lee <weilee.rx@gmail.com>
Co-authored-by: Wei Lee <weilee.rx@gmail.com>
…il all required assets are queued
Co-authored-by: Wei Lee <weilee.rx@gmail.com>
The annotation on AssetAndTimeSchedule.__init__ was migrated to Collection[SerializedAsset] | SerializedAssetBase, leaving the TYPE_CHECKING-only Asset import unreferenced and breaking mypy on two test files that passed SDK Assets to the core class. The scheduler tests now import AssetAndTimeSchedule (and the matching CronTriggerTimetable) from airflow.sdk so the types align with how DAG authors use it; the timetable test fixture mirrors core_asset_timetable by constructing SerializedAsset directly.
9e42573 to
9b94c1b
Compare
|
@uranusjr — Lee-W asked for your take on this 25 days ago. When you have a moment, could you weigh in? It's the only thing blocking a decision on the PR right now. Thanks! Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting |
|
It's a less common but reasonable use case. so if we have another committer's good with this feature, I'm good with it as well :) |
|
@nailo2c — This PR is labeled ready for maintainer review, and a maintainer left feedback about 7 days ago that is still awaiting your response. Could you take a look and either push an update or reply in the thread? Once you have responded, a maintainer will pick it back up from the review queue. No rush — just flagging it so the PR does not stall. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
|
Thanks for the reminder. I believe the previous review comments have been addressed and all review threads are resolved. Based on the latest discussion, it seems the remaining point is waiting for another committer's opinion on the feature. Please let me know if there is anything else I should update :) |
related: #58056
Why
Use case: #58056
How
1. Implement
AssetAndTimeScheduleclassBasically almost the same as AssetOrTimeSchedule, but this class inherits from
Timetableand delegatesrun_idgeneration to the wrapped timetable.2. Assign
AssetAndTimeScheduleto thenon_asset_dagssetModifying DagModel.dags_needing_dagruns to prevent
AssetAndTimeSchedulefrom being assigned totriggered_date_by_dag.3. Add starting logic for
AssetAndTimeSchedulein the scheduler.Updating the DAG status from
QUEUEDtoRUNNINGwhen both the timetable and asset conditions are meet.What
Here are my test DAGs:
upstream_asset_producerdownstream_asset_and_time_consumerFrom the screenshots, the downstream DAG runs as expected: it runs only after both the timetable and the assets are ready.
Upstream dag
Asset production time
Downstream,
AssetAndTimeScheduleDiscussion
Since I added this logic in
dag.pyto guarantee that onlyAssetTriggeredTimetablecan be added totriggered_date_by_dag.Should I remove the extra guard logic in
SchedulerJobRunner._create_dag_runs_asset_triggered?airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py
Lines 1800 to 1805 in 9e7b36e