Skip to content

Commit e86eb9b

Browse files
videos: improve post migration script
1 parent a845665 commit e86eb9b

1 file changed

Lines changed: 93 additions & 60 deletions

File tree

cds_migrator_kit/videos/scripts/run_failed_tasks.py

Lines changed: 93 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,33 @@
1616
TranscodeVideoTask,
1717
sync_records_with_deposit_files,
1818
)
19+
from cds.modules.records.utils import parse_video_chapters
1920
from invenio_db import db
2021
from invenio_files_rest.models import ObjectVersion, ObjectVersionTag
2122

23+
SUCCESS_LOG_PATH = None
24+
ERROR_LOG_PATH = None
25+
26+
27+
def log_success(message):
28+
"""Write a success message to the success log file."""
29+
if not SUCCESS_LOG_PATH:
30+
raise RuntimeError("SUCCESS_LOG_PATH is not initialized.")
31+
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
32+
with open(SUCCESS_LOG_PATH, "a") as log_file:
33+
log_file.write(f"[{timestamp}] {message}\n")
34+
print(message)
35+
36+
37+
def log_error(message):
38+
"""Write an error message to the error log file."""
39+
if not ERROR_LOG_PATH:
40+
raise RuntimeError("ERROR_LOG_PATH is not initialized.")
41+
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
42+
with open(ERROR_LOG_PATH, "a") as log_file:
43+
log_file.write(f"[{timestamp}] {message}\n")
44+
print(message)
45+
2246

2347
def copy_master_tags_between_buckets(src_bucket, dst_bucket):
2448
"""Copy tags of the master ObjectVersion from src_bucket to dst_bucket."""
@@ -72,20 +96,22 @@ def run_failed_tasks(failed_tasks, flow, deposit_id, record_id):
7296
task.status = FlowTaskStatus.PENDING
7397
db.session.commit()
7498

75-
print(f"Re-running ExtractMetadataTask for record {record_id}")
99+
log_success(f"Re-running ExtractMetadataTask for record {record_id}")
76100
payload["task_id"] = str(task.id)
77101

78102
celery_task = ExtractMetadataTask()
79103
celery_task.clean(deposit_id=deposit_id, version_id=payload["version_id"])
80104
celery_task.s(**payload).apply_async()
81105
db.session.commit()
82-
fetch_tasks_status(flow_id, ExtractMetadataTask.name, timeout_seconds=120)
106+
fetch_tasks_status(flow_id, ExtractMetadataTask.name, timeout_seconds=300)
83107

84108
# Remove from failed list so we don't run it twice
85109
failed_tasks = [t for t in failed_tasks if t[0] != ExtractMetadataTask.name]
86110

87111
db.session.expire_all()
88112
try:
113+
# Make sure it finished
114+
fetch_tasks_status(flow_id, ExtractMetadataTask.name, timeout_seconds=100)
89115
flow = db.session.query(FlowMetadata).get(flow_id)
90116
deposit = deposit_video_resolver(deposit_id)
91117
extracted_metadata = deposit["_cds"]["extracted_metadata"]
@@ -98,7 +124,7 @@ def run_failed_tasks(failed_tasks, flow, deposit_id, record_id):
98124
dst_bucket=record["_buckets"]["record"],
99125
)
100126
except Exception as e:
101-
print(f"An error occurred while handling ExtractMetadataTask: {e}")
127+
log_error(f"ERROR: ExtractMetadataTask: {e}")
102128

103129
# --- Handle ExtractFramesTask separately ---
104130
if ExtractFramesTask.name in task_names:
@@ -108,7 +134,7 @@ def run_failed_tasks(failed_tasks, flow, deposit_id, record_id):
108134
task.status = FlowTaskStatus.PENDING
109135
db.session.commit()
110136

111-
print(f"Re-running ExtractFramesTask for record {record_id}")
137+
log_success(f"Re-running ExtractFramesTask for record {record_id}")
112138
payload["task_id"] = str(task.id)
113139

114140
celery_task = ExtractFramesTask()
@@ -124,11 +150,11 @@ def run_failed_tasks(failed_tasks, flow, deposit_id, record_id):
124150

125151
# --- Handle if other task failed ---
126152
for task_name, task_id in failed_tasks:
127-
print(f"Re-running failed task: {task_name} for record {record_id}")
153+
log_success(f"Re-running failed task: {task_name} for record {record_id}")
128154

129155
task_cls = _find_celery_task_by_name(task_name)
130156
if not task_cls:
131-
print(f"No Celery task class found for {task_name}. Skipping.")
157+
log_success(f"No Celery task class found for {task_name}. Skipping.")
132158
continue
133159

134160
task = db.session.query(FlowTaskMetadata).get(task_id)
@@ -142,8 +168,6 @@ def run_failed_tasks(failed_tasks, flow, deposit_id, record_id):
142168
celery_task.s(**payload).apply_async()
143169
db.session.commit()
144170

145-
fetch_tasks_status(flow_id, task.name, timeout_seconds=120)
146-
147171

148172
def fetch_tasks_status(flow_id, task_name, timeout_seconds=60):
149173
"""Wait for a specific task in a flow to finish (SUCCESS or FAILURE)."""
@@ -152,7 +176,7 @@ def fetch_tasks_status(flow_id, task_name, timeout_seconds=60):
152176
while True:
153177
elapsed_time = time.time() - start_time
154178
if elapsed_time >= timeout_seconds:
155-
print(f"Timeout reached after {timeout_seconds} seconds. Exiting.")
179+
log_error(f"Timeout reached after {timeout_seconds} seconds. Exiting.")
156180
break
157181

158182
# Force SQLAlchemy to fetch fresh data from the DB
@@ -173,20 +197,35 @@ def fetch_tasks_status(flow_id, task_name, timeout_seconds=60):
173197
else:
174198
# Explicit success/failure logs
175199
if task.status == FlowTaskStatus.SUCCESS:
176-
print(f" Task '{task.name}' completed successfully.")
200+
log_success(f"SUCCESS Task '{task.name}' completed successfully.")
177201
elif task.status == FlowTaskStatus.FAILURE:
178-
print(f" Task '{task.name}' failed.")
202+
log_error(f"ERROR: Task '{task.name}' failed.")
179203
else:
180-
print(f"ℹTask '{task.name}' finished with status: {task.status.name}.")
204+
log_success(
205+
f"Task '{task.name}' finished with status: {task.status.name}."
206+
)
181207
break
182208

183209
time.sleep(5) # Poll every 5 seconds
184210

185211

186-
def finalize_tasks(deposit_id):
212+
def run_chapters_task(deposit_id, record_id, flow_id):
213+
"""Run the chapters task for a given deposit and flow."""
214+
# Make sure all ExtractMetadataTask and ExtractFramesTask finished
215+
fetch_tasks_status(flow_id, ExtractMetadataTask.name, timeout_seconds=60)
216+
fetch_tasks_status(flow_id, ExtractFramesTask.name, timeout_seconds=60)
217+
187218
# Always work on a clean session to avoid cached data
188219
db.session.expire_all()
189220

221+
# Get the record to see if it has chapters
222+
record = record_video_resolver(record_id)
223+
description = record.get("description", "")
224+
chapters = parse_video_chapters(description)
225+
if not chapters:
226+
log_success(f"No chapters found for record {record_id}. Skipping chapters.")
227+
return
228+
190229
flow = FlowMetadata.get_by_deposit(deposit_id)
191230
flow_id = flow.id
192231
payload = flow.payload.copy()
@@ -199,12 +238,16 @@ def finalize_tasks(deposit_id):
199238
and task.status == FlowTaskStatus.SUCCESS
200239
):
201240
run_chapters_task = False
241+
if (
242+
task.name == ExtractMetadataTask.name or task.name == ExtractFramesTask.name
243+
) and task.status == FlowTaskStatus.FAILURE:
244+
run_chapters_task = False
245+
log_error(
246+
f"ERROR: ExtractMetadataTask/ExtractFramesTask failed for deposit {deposit_id}. Chapter task will not run."
247+
)
202248

203249
if run_chapters_task:
204-
# Wait for ExtractFramesTask to finish to run ExtractChapterFramesTask
205-
print("Waiting for ExtractFramesTask to complete...")
206-
fetch_tasks_status(flow_id, ExtractFramesTask.name, timeout_seconds=240)
207-
print("Running ExtractChapterFramesTask...")
250+
log_success("Running ExtractChapterFramesTask...")
208251

209252
# Create a FlowTaskMetadata
210253
new_task = FlowTaskMetadata(
@@ -218,72 +261,62 @@ def finalize_tasks(deposit_id):
218261
payload["task_id"] = str(new_task.id)
219262
ExtractChapterFramesTask().s(**payload).apply_async()
220263

221-
# Poll for task completion
222-
fetch_tasks_status(flow_id, ExtractChapterFramesTask.name, timeout_seconds=240)
223-
224-
225-
def fetch_flow_and_log(record_id, deposit_id, flow_id, failed_tasks, log_file_path):
226-
"""Fetch the latest flow and write detailed info to the log file."""
227-
# Ensure we read the latest DB state
228-
db.session.expire_all()
229-
flow = db.session.query(FlowMetadata).get(flow_id)
230-
231-
with open(log_file_path, "a") as log_file:
232-
log_file.write("\n" + "=" * 80 + "\n")
233-
log_file.write(f"Record ID: {record_id}\n")
234-
log_file.write(f"Deposit ID: {deposit_id}\n")
235-
log_file.write(f"Flow ID: {flow_id}\n")
236-
log_file.write("-" * 80 + "\n")
237-
238-
# Log previously failed tasks
239-
if failed_tasks:
240-
log_file.write("Previously failed tasks:\n")
241-
for task_name, task_id in failed_tasks:
242-
log_file.write(f" - {task_name} (ID: {task_id})\n")
243-
else:
244-
log_file.write("No previously failed tasks.\n")
245-
246-
log_file.write("-" * 80 + "\n")
247-
log_file.write("Latest task statuses:\n")
248-
249-
# Iterate all tasks in the flow and log their current statuses
250-
for task in flow.tasks:
251-
task_obj = db.session.query(FlowTaskMetadata).get(task.id)
252-
log_file.write(f" • {task_obj.name:<30} | Status: {task_obj.status}\n")
253-
254-
log_file.write("=" * 80 + "\n\n")
264+
log_success("ExtractChapterFramesTask started async.")
265+
else:
266+
log_success("Skipped ExtractChapterFramesTask.")
255267

256268

257269
def load_record_ids(redirections_file_path):
258270
with open(redirections_file_path, "r") as f:
259271
data = json.load(f)
260272

261273
# Extract all cds_videos_id values
262-
record_ids = [item["cds_videos_id"] for item in data]
274+
record_ids = [item["cds_videos_recid"] for item in data]
263275
return record_ids
264276

265277

266278
def main():
267-
# Create a log file
279+
global SUCCESS_LOG_PATH, ERROR_LOG_PATH
280+
# Create success and error log files
268281
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
269-
log_file_path = f"/tmp/task_recovery_log_{timestamp}.txt"
270-
with open(log_file_path, "w") as log_file:
271-
pass
282+
SUCCESS_LOG_PATH = f"/tmp/task_recovery_log_success_{timestamp}.txt"
283+
ERROR_LOG_PATH = f"/tmp/task_recovery_log_error_{timestamp}.txt"
284+
285+
with open(SUCCESS_LOG_PATH, "w") as log_file:
286+
log_file.write(
287+
f"Success Log - Started at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
288+
)
289+
log_file.write("=" * 80 + "\n\n")
290+
291+
with open(ERROR_LOG_PATH, "w") as log_file:
292+
log_file.write(
293+
f"Error Log - Started at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
294+
)
295+
log_file.write("=" * 80 + "\n\n")
272296

273297
redirections_file_path = "/tmp/record_redirections.json"
274298
all_record_ids = load_record_ids(redirections_file_path)
275-
record_ids = all_record_ids[:100] # any subset
299+
record_ids = all_record_ids[:100] # any subset
276300
for record_id in record_ids:
301+
log_success(f"Processing record {record_id}")
277302
record = record_video_resolver(record_id)
278303
deposit_id = record["_deposit"]["id"]
279304

280305
flow, failed_tasks = find_failed_tasks(deposit_id)
281306
flow_id = flow.id
282307
if not failed_tasks:
283-
print(f"No failed tasks found for record {record_id}.")
308+
log_success(f"No failed tasks found for record {record_id}.")
284309
else:
310+
task_names = [task[0] for task in failed_tasks]
311+
log_success(f"Failed tasks: {task_names}")
285312
run_failed_tasks(failed_tasks, flow, deposit_id, record_id)
286313

287-
finalize_tasks(deposit_id)
314+
# After all records are processed for metadata and frames we can run the chapters task
315+
for record_id in record_ids:
316+
log_success(f"Processing record {record_id}")
317+
record = record_video_resolver(record_id)
318+
deposit_id = record["_deposit"]["id"]
288319

289-
fetch_flow_and_log(record_id, deposit_id, flow_id, failed_tasks, log_file_path)
320+
flow, failed_tasks = find_failed_tasks(deposit_id)
321+
flow_id = flow.id
322+
run_chapters_task(deposit_id, record_id, flow_id)

0 commit comments

Comments
 (0)