-
Notifications
You must be signed in to change notification settings - Fork 35
Redis queue system for online mode #305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This document outlines the architecture and implementation steps for introducing a Redis-based job queueing system to online deployments: - Uses RQ (Redis Queue) for task management - Docker Compose additions for Redis, worker, and dashboard services - New QueueManager class for queue interactions - Worker tasks module for background execution - Health monitoring utilities - Backward compatible with local mode (multiprocessing fallback)
Update implementation plan to run Redis server and RQ worker within the same Docker container as the Streamlit app: - Redis server runs as background process via entrypoint script - RQ worker runs in same container with identical environment - No docker-compose orchestration complexity - All communication via localhost - Optional supervisord config for robust process management - Simplified deployment and debugging
…etails - Add "Design Principles" section explaining plug & play architecture - Document offline mode (Windows installer) compatibility - zero changes needed - Add comprehensive "Configuring Worker Count" section with multiple methods - Add "User Experience: Queue Status Display" section showing what users see - Add "Sidebar Metrics" section for queue monitoring alongside CPU/RAM - Include code examples for all UI components
New files: - src/workflow/QueueManager.py: Redis queue interaction layer - src/workflow/tasks.py: Worker task definitions for RQ - src/workflow/health.py: Queue health check utilities Modified files: - Dockerfile: Install Redis server, add entrypoint for Redis/RQ workers - requirements.txt: Add redis and rq packages - settings.json: Add queue_settings configuration - src/workflow/WorkflowManager.py: Add queue support with local fallback - src/workflow/StreamlitUI.py: Add queue status display in execution section - src/common/common.py: Add sidebar queue metrics for online mode Features: - Automatic queue mode in online deployment, local mode unchanged - Queue position and progress display when workflows are queued - Sidebar metrics showing worker utilization and queue depth - Configurable worker count via RQ_WORKER_COUNT env variable - Graceful fallback to multiprocessing if Redis unavailable
📝 WalkthroughWalkthroughIntroduces a Redis-backed job queue system (via RQ) for online workflow execution. Adds Docker setup with Redis and RQ workers, Python modules for queue management and health checks, and integrates queue status monitoring into the Streamlit UI and workflow orchestration layer. Configuration and dependency files updated accordingly. Changes
Sequence DiagramssequenceDiagram
participant UI as Streamlit UI
participant WM as WorkflowManager
participant QM as QueueManager
participant Redis
participant RQWorker as RQ Worker
UI->>WM: start_workflow()
WM->>WM: detect online mode
alt Online Mode Available
WM->>QM: submit_job(execute_workflow, ...)
QM->>Redis: enqueue job to queue
Redis-->>QM: job_id
QM->>QM: store_job_id(workflow_dir, job_id)
QM-->>WM: job_id
WM-->>UI: job submitted
par Status Polling
UI->>WM: get_workflow_status()
WM->>QM: get_job_info(job_id)
QM->>Redis: retrieve job metadata
Redis-->>QM: job status, progress, queue_position
QM-->>WM: JobInfo
WM-->>UI: status {progress, queue_position, ...}
and Worker Execution
RQWorker->>Redis: pop job from queue
RQWorker->>RQWorker: execute_workflow(workflow_dir, class, module)
RQWorker->>RQWorker: _update_progress(job, progress, step)
RQWorker->>Redis: store job metadata (progress, current_step)
end
else Fallback to Local
WM->>WM: _start_workflow_local()
WM-->>UI: workflow running locally
end
sequenceDiagram
participant UI as Streamlit UI
participant WM as WorkflowManager
participant QM as QueueManager
participant Health as health module
participant Redis
UI->>UI: display sidebar Resource Utilization
UI->>Health: get_queue_metrics()
Health->>Redis: connect via REDIS_URL
Redis-->>Health: ping response, worker info, job counts
Health-->>UI: {available, total_workers, busy_workers, idle_workers, queued_jobs}
UI->>UI: render monitor_queue fragment
UI->>UI: display workers, queued count, utilization bar
alt high queue depth
UI->>UI: show warning
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🤖 Fix all issues with AI agents
In `@Dockerfile`:
- Around line 166-173: The Redis readiness loop (the `until redis-cli ping`
section) can hang indefinitely; modify it to include a maximum retry count or
timeout by introducing a counter and MAX_RETRIES (or a TIMEOUT) and incrementing
the counter each iteration, breaking with a non-zero exit and an error message
if the max is reached; ensure you still echo progress messages and exit
successfully when `redis-cli ping` responds.
- Around line 152-155: The Dockerfile sets ENV REDIS_URL but the entrypoint
script still hardcodes "redis://localhost:6379/0", making the variable
ineffective; update the entrypoint invocation (the script or ENTRYPOINT that
launches the RQ worker) to use the REDIS_URL environment variable (e.g.,
substitute the literal URL with $REDIS_URL or ${REDIS_URL}) so the ENV REDIS_URL
defined alongside ENV RQ_WORKER_COUNT is honored at runtime.
- Around line 175-180: The startup script starts background RQ workers using
WORKER_COUNT (env RQ_WORKER_COUNT) and the command pattern "rq worker
openms-workflows --url redis://localhost:6379/0 --name worker-$i" but lacks
signal handling for graceful shutdown; add a trap for SIGTERM/SIGINT that
forwards these signals to the child worker PIDs, wait for them to exit, and on
timeout send TERM/QUIT/INT as appropriate—capture and store each background PID
when launching workers, implement a cleanup function (invoked by trap) that
iterates over those PIDs to kill/terminate gracefully and then wait for
completion, and ensure the script exits with the workers' exit status.
In `@requirements.txt`:
- Around line 140-143: The requirements.txt currently mixes pip-compile
generated content with manual additions (the lines for redis>=5.0.0 and
rq>=1.16.0) and lacks the expected pip-compile metadata; fix by either creating
a pyproject.toml that declares redis and rq and then regenerate requirements.txt
via pip-compile (so the added entries get proper “# via …” annotations), or keep
the manual additions and update the top-of-file header to explicitly state that
these two entries were manually appended (and add a short inline comment after
each dependency to indicate they are manual additions) so the file format and
provenance are consistent; refer to the dependency names "redis" and "rq" and
the requirements.txt header when making the change.
In `@settings.json`:
- Around line 19-24: The settings.json includes a unused queue_settings object;
either remove it or wire its enabled flag into the queue initialization logic.
Update the QueueManager initialization (constructor/init method in QueueManager)
to read settings.queue_settings?.enabled and treat false as "do not initialize
Redis" before the existing online_deployment and REDIS_URL checks, or remove
queue_settings from settings.json if you prefer to keep the current behavior;
ensure any references to queue initialization still use the final decision and
document that queue_settings.enabled controls queue activation.
In `@src/common/common.py`:
- Around line 43-85: Replace the silent blanket except in monitor_queue with a
debug-level log that preserves exception details: catch Exception as e and call
logging.getLogger(__name__).debug(...) (include a short message like
"monitor_queue: failed to fetch queue metrics" and pass exc_info=True or the
exception) so failures are recorded for diagnostics but not shown in the UI;
ensure logging is imported at top of the module and reference the monitor_queue
function and the get_queue_metrics call in the log message for context.
In `@src/workflow/WorkflowManager.py`:
- Around line 55-81: The job_id generation in _start_workflow_queued currently
uses int(time.time()) which can collide; replace that single-second timestamp
with a collision-resistant identifier (e.g., append or replace with
uuid.uuid4().hex or use time.time_ns() combined with a short random/UUID) when
building job_id in _start_workflow_queued; ensure you import uuid if using
uuid.uuid4(), keep the existing job_id format prefix
("workflow-{self.workflow_dir.name}-...") so downstream uses (e.g.,
_queue_manager.store_job_id and submit_job) continue to find the ID.
- Around line 26-42: The lint error arises because 'QueueManager' is referenced
as a forward string but not imported at module scope; update the module to
import QueueManager inside a TYPE_CHECKING block so the name exists for static
checkers without runtime import. Add "from typing import TYPE_CHECKING" at the
top and under "if TYPE_CHECKING:" add "from .QueueManager import QueueManager";
keep the runtime dynamic import inside _init_queue_manager() (the existing
function) and leave _is_online_mode() unchanged.
🧹 Nitpick comments (7)
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md (2)
17-31: Add language specifiers to fenced code blocks.Several ASCII diagram blocks lack language specifiers, which can affect rendering and linting. Consider adding
textorplaintextas the language identifier for diagram blocks.This applies to multiple blocks throughout the document (lines 17, 65, 100, 204, 1023, 1256, 1270, 1364).
Example fix
-``` +```text ┌─────────────────────────────────────────────────────────────────┐ │ WorkflowManager │
247-250: Consider non-root user for supervisord processes.The supervisord configuration runs as
user=root. For improved security posture in production deployments, consider running processes under a dedicated non-root user. This is especially relevant if the container processes untrusted workflow data.src/workflow/tasks.py (2)
92-96: Destructive results directory cleanup without backup.Deleting the entire results directory before execution could cause unexpected data loss if a user accidentally restarts a workflow. Consider archiving previous results or adding a confirmation mechanism.
120-147: Broad exception handling is appropriate but could benefit from more specific logging.The static analysis flags about blind
Exceptioncatching are less concerning here since this is a top-level task handler that must catch all errors. However, the silentexcept: passblocks (lines 133-134, 140-141) could hide important debugging information.Consider using
logging.debug()or similar to capture these secondary failures without affecting the main error flow.Example improvement for cleanup logging
except Exception: - pass + pass # Intentionally silent - don't mask original errorOr with actual logging:
import logging logger = logging.getLogger(__name__) # ... except Exception as cleanup_err: logger.debug(f"Cleanup failed: {cleanup_err}")src/workflow/health.py (1)
92-128: Expose queue-metrics failures for debugging.
Returning only{"available": False}loses the cause. Consider including the exception message to help troubleshoot without breaking callers.🔧 Suggested fix
- except Exception: - return {"available": False} + except Exception as e: + return {"available": False, "error": str(e)}src/workflow/StreamlitUI.py (1)
1088-1131: Remove unusedmsg_typemapping.
The tuple’s second value isn’t used; simplifying avoids lint noise.♻️ Suggested cleanup
- status_display = { - "queued": ("Queued", "info"), - "started": ("Running", "info"), - "finished": ("Completed", "success"), - "failed": ("Failed", "error"), - "canceled": ("Cancelled", "warning"), - } - - label, msg_type = status_display.get(job_status, ("Unknown", "info")) + status_display = { + "queued": "Queued", + "started": "Running", + "finished": "Completed", + "failed": "Failed", + "canceled": "Cancelled", + } + + label = status_display.get(job_status, "Unknown")src/workflow/QueueManager.py (1)
55-104: Allow Redis init to recover after transient startup failures.
_init_attemptedprevents retry; if Redis isn’t ready at first init (common in Docker), the queue may stay unavailable for the life of the instance. Consider lazy re-init inis_available(optionally with backoff).🔧 Suggested fix
- self._init_attempted = False - - if self._is_online: - self._init_redis() + if self._is_online: + self._init_redis() ... - if self._init_attempted: - return - self._init_attempted = True - try: from redis import Redis from rq import Queue ... `@property` def is_available(self) -> bool: """Check if queue system is available""" - return self._is_online and self._queue is not None + if self._is_online and self._queue is None: + self._init_redis() + return self._is_online and self._queue is not None
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
Dockerfiledocs/REDIS_QUEUE_IMPLEMENTATION_PLAN.mdrequirements.txtsettings.jsonsrc/common/common.pysrc/workflow/QueueManager.pysrc/workflow/StreamlitUI.pysrc/workflow/WorkflowManager.pysrc/workflow/health.pysrc/workflow/tasks.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-03-10T11:09:56.467Z
Learnt from: subCode321
Repo: OpenMS/streamlit-template PR: 160
File: content/run_example_workflow.py:83-88
Timestamp: 2025-03-10T11:09:56.467Z
Learning: In the OpenMS streamlit-template project, continuous real-time monitoring of CPU stats and workflow jobs is implemented using a while True loop, which is considered acceptable for this specific use case as confirmed by the maintainer.
Applied to files:
src/common/common.pysrc/workflow/StreamlitUI.py
📚 Learning: 2025-03-10T11:09:56.468Z
Learnt from: subCode321
Repo: OpenMS/streamlit-template PR: 160
File: content/run_example_workflow.py:83-88
Timestamp: 2025-03-10T11:09:56.468Z
Learning: In the OpenMS streamlit-template project, continuous real-time monitoring of CPU stats and workflow jobs is implemented using a while True loop with a time.sleep(1) in the monitor_cpu_ram_stats function, which is considered acceptable for this specific use case as confirmed by the maintainer.
Applied to files:
src/common/common.py
🧬 Code graph analysis (3)
src/common/common.py (1)
src/workflow/health.py (1)
get_queue_metrics(92-129)
src/workflow/StreamlitUI.py (1)
src/workflow/CommandExecutor.py (1)
stop(248-262)
src/workflow/tasks.py (5)
src/workflow/CommandExecutor.py (1)
CommandExecutor(13-319)src/workflow/FileManager.py (1)
FileManager(7-179)src/workflow/ParameterManager.py (1)
ParameterManager(7-141)src/workflow/Logger.py (2)
Logger(3-42)log(16-42)src/workflow/WorkflowManager.py (1)
execution(259-266)
🪛 LanguageTool
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md
[style] ~1607-~1607: This adverb was used twice in the sentence. Consider removing one of them or replacing them with a synonym.
Context: ...ctions correctly - [ ] Logs are written correctly from worker - [ ] Multiple concurrent j...
(ADVERB_REPETITION_PREMIUM)
🪛 markdownlint-cli2 (0.18.1)
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md
17-17: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
65-65: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
100-100: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
204-204: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1023-1023: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1256-1256: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1270-1270: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1364-1364: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🪛 Ruff (0.14.11)
src/common/common.py
83-84: try-except-pass detected, consider logging the exception
(S110)
83-83: Do not catch blind exception: Exception
(BLE001)
src/workflow/WorkflowManager.py
27-27: Undefined name QueueManager
(F821)
src/workflow/StreamlitUI.py
1101-1101: Unpacked variable msg_type is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
src/workflow/tasks.py
37-37: Do not catch blind exception: Exception
(BLE001)
120-120: Do not catch blind exception: Exception
(BLE001)
121-121: Use explicit conversion flag
Replace with conversion flag
(RUF010)
131-131: Use explicit conversion flag
Replace with conversion flag
(RUF010)
133-134: try-except-pass detected, consider logging the exception
(S110)
133-133: Do not catch blind exception: Exception
(BLE001)
140-141: try-except-pass detected, consider logging the exception
(S110)
140-140: Do not catch blind exception: Exception
(BLE001)
157-158: try-except-pass detected, consider logging the exception
(S110)
157-157: Do not catch blind exception: Exception
(BLE001)
src/workflow/QueueManager.py
75-75: Do not catch blind exception: Exception
(BLE001)
95-95: Do not catch blind exception: Exception
(BLE001)
109-109: PEP 484 prohibits implicit Optional
Convert to T | None
(RUF013)
146-146: Consider moving this statement to an else block
(TRY300)
147-147: Do not catch blind exception: Exception
(BLE001)
205-205: Do not catch blind exception: Exception
(BLE001)
214-214: Consider moving this statement to an else block
(TRY300)
215-215: Do not catch blind exception: Exception
(BLE001)
236-236: Consider moving this statement to an else block
(TRY300)
237-237: Do not catch blind exception: Exception
(BLE001)
265-265: Do not catch blind exception: Exception
(BLE001)
src/workflow/health.py
38-38: Do not catch blind exception: Exception
(BLE001)
85-85: Do not catch blind exception: Exception
(BLE001)
128-128: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: build-simple-app
- GitHub Check: build-full-app
- GitHub Check: build-openms
🔇 Additional comments (19)
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md (1)
1-9: Well-structured implementation plan.The document provides comprehensive guidance covering architecture, migration steps, rollback procedures, and testing checklists. The dual-mode design (online/offline) with graceful fallback is well thought out.
Dockerfile (2)
121-128: Redis installation looks good.The Redis server installation with
--no-install-recommendsand proper data directory setup with correct ownership is well done.
166-166: Redis persistence disabled despite plan documentation.The entrypoint uses
--appendonly nobut the implementation plan (line 175 in docs) recommends--appendonly yesfor job persistence across container restarts. If persistence is intentionally disabled, consider documenting this decision.src/workflow/tasks.py (3)
1-6: Clear module documentation.The docstring correctly notes that this module must be importable without Streamlit, which is crucial for RQ workers.
150-158: Progress update helper is well-designed.The defensive approach with
if job is not Noneand the try-except ensures progress updates never break workflow execution. This is appropriate for a non-critical feature.
78-90: Missinguiattribute concern is invalid for worker execution context.The manual workflow construction omits the
self.uiattribute fromWorkflowManager.__init__, but this is not a problem. The worker execution path only callsworkflow.execution(), which uses onlyself.params,self.logger,self.file_manager, andself.executor—all of which are properly set. Theuiattribute is only needed for Streamlit UI methods (upload(),configure(),results()), which are not called in the worker context. The construction is appropriate and safe for this use case.Likely an incorrect or invalid review comment.
src/common/common.py (1)
433-435: Queue metrics gated to online mode looks good.src/workflow/health.py (2)
11-43: Redis health check output is well-structured.
45-89: Worker health aggregation looks solid.src/workflow/WorkflowManager.py (3)
112-169: Queue-aware status reporting is clean and comprehensive.
171-212: Stop workflow flow looks consistent with local and queued modes.
229-233: Execution-section wiring to new callbacks looks correct.src/workflow/StreamlitUI.py (1)
989-1087: Execution section integration with queue status looks solid.src/workflow/QueueManager.py (6)
105-149: Job submission API is clear and well-scoped.
150-207: Job status mapping and metadata extraction look good.
208-217: Queue position helper is straightforward.
218-239: Cancel flow is clean and minimal.
240-267: Queue stats aggregation is solid.
268-284: Job ID persistence helpers are simple and effective.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| # Set default worker count (can be overridden via environment variable) | ||
| ENV RQ_WORKER_COUNT=1 | ||
| ENV REDIS_URL=redis://localhost:6379/0 | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
REDIS_URL environment variable not used consistently.
REDIS_URL is set here but the entrypoint script (line 179) hardcodes redis://localhost:6379/0 instead of using $REDIS_URL. This makes the environment variable ineffective.
Proposed fix
- rq worker openms-workflows --url redis://localhost:6379/0 --name worker-$i &\n\
+ rq worker openms-workflows --url $REDIS_URL --name worker-$i &\n\🤖 Prompt for AI Agents
In `@Dockerfile` around lines 152 - 155, The Dockerfile sets ENV REDIS_URL but the
entrypoint script still hardcodes "redis://localhost:6379/0", making the
variable ineffective; update the entrypoint invocation (the script or ENTRYPOINT
that launches the RQ worker) to use the REDIS_URL environment variable (e.g.,
substitute the literal URL with $REDIS_URL or ${REDIS_URL}) so the ENV REDIS_URL
defined alongside ENV RQ_WORKER_COUNT is honored at runtime.
| redis-server --daemonize yes --dir /var/lib/redis --appendonly no\n\ | ||
| \n\ | ||
| # Wait for Redis to be ready\n\ | ||
| until redis-cli ping > /dev/null 2>&1; do\n\ | ||
| echo "Waiting for Redis..."\n\ | ||
| sleep 1\n\ | ||
| done\n\ | ||
| echo "Redis is ready"\n\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add timeout to Redis readiness check to prevent infinite loop.
The until loop waiting for Redis has no timeout, which could hang indefinitely if Redis fails to start. Consider adding a maximum retry count or timeout.
Proposed fix
# Wait for Redis to be ready\n\
+RETRY=0\n\
+MAX_RETRIES=30\n\
until redis-cli ping > /dev/null 2>&1; do\n\
+ RETRY=$((RETRY+1))\n\
+ if [ $RETRY -ge $MAX_RETRIES ]; then\n\
+ echo "Redis failed to start after $MAX_RETRIES attempts"\n\
+ exit 1\n\
+ fi\n\
echo "Waiting for Redis..."\n\
sleep 1\n\
done\n\📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| redis-server --daemonize yes --dir /var/lib/redis --appendonly no\n\ | |
| \n\ | |
| # Wait for Redis to be ready\n\ | |
| until redis-cli ping > /dev/null 2>&1; do\n\ | |
| echo "Waiting for Redis..."\n\ | |
| sleep 1\n\ | |
| done\n\ | |
| echo "Redis is ready"\n\ | |
| redis-server --daemonize yes --dir /var/lib/redis --appendonly no\n\ | |
| \n\ | |
| # Wait for Redis to be ready\n\ | |
| RETRY=0\n\ | |
| MAX_RETRIES=30\n\ | |
| until redis-cli ping > /dev/null 2>&1; do\n\ | |
| RETRY=$((RETRY+1))\n\ | |
| if [ $RETRY -ge $MAX_RETRIES ]; then\n\ | |
| echo "Redis failed to start after $MAX_RETRIES attempts"\n\ | |
| exit 1\n\ | |
| fi\n\ | |
| echo "Waiting for Redis..."\n\ | |
| sleep 1\n\ | |
| done\n\ | |
| echo "Redis is ready"\n\ |
🤖 Prompt for AI Agents
In `@Dockerfile` around lines 166 - 173, The Redis readiness loop (the `until
redis-cli ping` section) can hang indefinitely; modify it to include a maximum
retry count or timeout by introducing a counter and MAX_RETRIES (or a TIMEOUT)
and incrementing the counter each iteration, breaking with a non-zero exit and
an error message if the max is reached; ensure you still echo progress messages
and exit successfully when `redis-cli ping` responds.
| # Start RQ worker(s) in background\n\ | ||
| WORKER_COUNT=${RQ_WORKER_COUNT:-1}\n\ | ||
| echo "Starting $WORKER_COUNT RQ worker(s)..."\n\ | ||
| for i in $(seq 1 $WORKER_COUNT); do\n\ | ||
| rq worker openms-workflows --url redis://localhost:6379/0 --name worker-$i &\n\ | ||
| done\n\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Background RQ workers lack signal handling for graceful shutdown.
RQ workers are started with & but there's no trap for signals to ensure graceful shutdown when the container stops. This could lead to interrupted jobs and data loss.
Proposed fix
+# Trap signals for graceful shutdown\n\
+cleanup() {\n\
+ echo "Shutting down..."\n\
+ pkill -TERM rq\n\
+ redis-cli shutdown nosave\n\
+ exit 0\n\
+}\n\
+trap cleanup SIGTERM SIGINT\n\
+\n\
# Start RQ worker(s) in background\n\
WORKER_COUNT=${RQ_WORKER_COUNT:-1}\n\🤖 Prompt for AI Agents
In `@Dockerfile` around lines 175 - 180, The startup script starts background RQ
workers using WORKER_COUNT (env RQ_WORKER_COUNT) and the command pattern "rq
worker openms-workflows --url redis://localhost:6379/0 --name worker-$i" but
lacks signal handling for graceful shutdown; add a trap for SIGTERM/SIGINT that
forwards these signals to the child worker PIDs, wait for them to exit, and on
timeout send TERM/QUIT/INT as appropriate—capture and store each background PID
when launching workers, implement a cleanup function (invoked by trap) that
iterates over those PIDs to kill/terminate gracefully and then wait for
completion, and ensure the script exits with the workers' exit status.
|
|
||
| # Redis Queue dependencies (for online mode) | ||
| redis>=5.0.0 | ||
| rq>=1.16.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's check the requirements.txt file header and structure
head -20 requirements.txtRepository: OpenMS/streamlit-template
Length of output: 479
🏁 Script executed:
# Check if pyproject.toml exists and its content
if [ -f "pyproject.toml" ]; then
echo "=== pyproject.toml exists ==="
cat pyproject.toml
else
echo "=== pyproject.toml does not exist ==="
fiRepository: OpenMS/streamlit-template
Length of output: 106
🏁 Script executed:
# Check the exact lines mentioned in the review (140-143)
sed -n '135,145p' requirements.txtRepository: OpenMS/streamlit-template
Length of output: 219
🏁 Script executed:
# Search for redis and rq references in the codebase to understand their usage
rg "redis|rq" --type-list | head -20Repository: OpenMS/streamlit-template
Length of output: 458
🌐 Web query:
latest version of rq Python Redis Queue and when it was released
💡 Result:
Latest rq (Python Redis Queue) version: 2.6.1 — released/ uploaded to PyPI on November 22, 2025. [1][2]
Sources:
- PyPI project page for rq (release files / upload date). [1]
- Safety DB package entry (lists v2.6.1, Nov 22, 2025). [2]
Manual additions to pip-compile generated file need reconciliation.
The file header indicates autogenerated content via pip-compile --output-file=requirements.txt pyproject.toml, but pyproject.toml does not exist in this repository. The manual redis>=5.0.0 and rq>=1.16.0 additions at the end also lack the dependency annotation comments (# via ...) present in other dependencies, breaking the expected format.
To resolve: Either create pyproject.toml with these dependencies and regenerate via pip-compile, or update the file header to document the manual additions. The version constraints themselves are appropriate.
🤖 Prompt for AI Agents
In `@requirements.txt` around lines 140 - 143, The requirements.txt currently
mixes pip-compile generated content with manual additions (the lines for
redis>=5.0.0 and rq>=1.16.0) and lacks the expected pip-compile metadata; fix by
either creating a pyproject.toml that declares redis and rq and then regenerate
requirements.txt via pip-compile (so the added entries get proper “# via …”
annotations), or keep the manual additions and update the top-of-file header to
explicitly state that these two entries were manually appended (and add a short
inline comment after each dependency to indicate they are manual additions) so
the file format and provenance are consistent; refer to the dependency names
"redis" and "rq" and the requirements.txt header when making the change.
| "workspaces_dir": "..", | ||
| "queue_settings": { | ||
| "enabled": true, | ||
| "default_timeout": 7200, | ||
| "result_ttl": 86400 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
rg -n "queue_settings" --type pyRepository: OpenMS/streamlit-template
Length of output: 51
🏁 Script executed:
rg -n "enabled" --type py | head -50Repository: OpenMS/streamlit-template
Length of output: 1532
🏁 Script executed:
find . -name "QueueManager.py" -o -name "*queue*manager*.py" 2>/dev/null | head -20Repository: OpenMS/streamlit-template
Length of output: 99
🏁 Script executed:
rg -n "online_deployment" --type pyRepository: OpenMS/streamlit-template
Length of output: 559
🏁 Script executed:
cat -n src/workflow/QueueManager.py | head -100Repository: OpenMS/streamlit-template
Length of output: 3631
🏁 Script executed:
rg -n "queue_settings" . --type jsonRepository: OpenMS/streamlit-template
Length of output: 111
🏁 Script executed:
cat -n src/workflow/QueueManager.py | tail -50Repository: OpenMS/streamlit-template
Length of output: 2068
🏁 Script executed:
cat -n settings.jsonRepository: OpenMS/streamlit-template
Length of output: 851
🏁 Script executed:
rg -n "queue_settings\." . --type pyRepository: OpenMS/streamlit-template
Length of output: 51
Remove queue_settings from settings.json or ensure it's actually used in code.
The queue_settings.enabled flag has no effect. QueueManager only checks the online_deployment flag and REDIS_URL environment variable to determine whether to initialize Redis; it never consults queue_settings.enabled or any other fields in the queue_settings object. Having this configuration present but unused creates confusion about what controls queue behavior.
Either integrate queue_settings.enabled into the queue initialization logic, or remove the unused configuration.
🤖 Prompt for AI Agents
In `@settings.json` around lines 19 - 24, The settings.json includes a unused
queue_settings object; either remove it or wire its enabled flag into the queue
initialization logic. Update the QueueManager initialization (constructor/init
method in QueueManager) to read settings.queue_settings?.enabled and treat false
as "do not initialize Redis" before the existing online_deployment and REDIS_URL
checks, or remove queue_settings from settings.json if you prefer to keep the
current behavior; ensure any references to queue initialization still use the
final decision and document that queue_settings.enabled controls queue
activation.
| @st.fragment(run_every=5) | ||
| def monitor_queue(): | ||
| """Display queue metrics in sidebar (online mode only)""" | ||
| try: | ||
| from src.workflow.health import get_queue_metrics | ||
|
|
||
| metrics = get_queue_metrics() | ||
| if not metrics.get("available", False): | ||
| return | ||
|
|
||
| st.markdown("---") | ||
| st.markdown("**Queue Status**") | ||
|
|
||
| total_workers = metrics.get("total_workers", 0) | ||
| busy_workers = metrics.get("busy_workers", 0) | ||
| queued_jobs = metrics.get("queued_jobs", 0) | ||
|
|
||
| col1, col2 = st.columns(2) | ||
| col1.metric( | ||
| "Workers", | ||
| f"{busy_workers}/{total_workers}", | ||
| help="Busy workers / Total workers" | ||
| ) | ||
| col2.metric( | ||
| "Queued", | ||
| queued_jobs, | ||
| help="Jobs waiting in queue" | ||
| ) | ||
|
|
||
| # Utilization progress bar | ||
| if total_workers > 0: | ||
| utilization = busy_workers / total_workers | ||
| st.progress(utilization, text=f"{int(utilization * 100)}% utilized") | ||
|
|
||
| # Warning if queue is backing up | ||
| if queued_jobs > total_workers * 2 and total_workers > 0: | ||
| st.warning(f"High queue depth: {queued_jobs} jobs waiting") | ||
|
|
||
| st.caption(f"Last fetched at: {time.strftime('%H:%M:%S')}") | ||
|
|
||
| except Exception: | ||
| pass # Silently fail if queue not available | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t swallow queue-monitoring exceptions silently.
Silent except Exception: pass makes queue outages invisible and hard to debug. Consider logging at debug level to keep the UI clean while preserving diagnostics.
🔧 Suggested fix
+import logging
+
+LOGGER = logging.getLogger(__name__)
...
- except Exception:
- pass # Silently fail if queue not available
+ except Exception as exc:
+ LOGGER.debug("Queue metrics unavailable", exc_info=exc)🧰 Tools
🪛 Ruff (0.14.11)
83-84: try-except-pass detected, consider logging the exception
(S110)
83-83: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In `@src/common/common.py` around lines 43 - 85, Replace the silent blanket except
in monitor_queue with a debug-level log that preserves exception details: catch
Exception as e and call logging.getLogger(__name__).debug(...) (include a short
message like "monitor_queue: failed to fetch queue metrics" and pass
exc_info=True or the exception) so failures are recorded for diagnostics but not
shown in the UI; ensure logging is imported at top of the module and reference
the monitor_queue function and the get_queue_metrics call in the log message for
context.
| # Initialize queue manager for online mode | ||
| self._queue_manager: Optional['QueueManager'] = None | ||
| if self._is_online_mode(): | ||
| self._init_queue_manager() | ||
|
|
||
| def _is_online_mode(self) -> bool: | ||
| """Check if running in online deployment mode""" | ||
| return st.session_state.get("settings", {}).get("online_deployment", False) | ||
|
|
||
| def _init_queue_manager(self) -> None: | ||
| """Initialize queue manager for online mode""" | ||
| try: | ||
| from .QueueManager import QueueManager | ||
| self._queue_manager = QueueManager() | ||
| except ImportError: | ||
| pass # Queue not available, will use fallback | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's check the current state of the WorkflowManager.py file
head -50 src/workflow/WorkflowManager.pyRepository: OpenMS/streamlit-template
Length of output: 2026
🏁 Script executed:
# Check imports section
head -30 src/workflow/WorkflowManager.py | cat -nRepository: OpenMS/streamlit-template
Length of output: 1472
🏁 Script executed:
# Find ruff configuration
find . -name "pyproject.toml" -o -name "ruff.toml" -o -name ".ruff.toml" | head -5Repository: OpenMS/streamlit-template
Length of output: 80
🏁 Script executed:
# Check if ruff is available and run it on the WorkflowManager.py file
if command -v ruff &> /dev/null; then
ruff check src/workflow/WorkflowManager.py --select F821 2>&1 || true
else
echo "ruff not available"
fiRepository: OpenMS/streamlit-template
Length of output: 574
🏁 Script executed:
# Check Python version and if we can run a type checker simulation
# Let's look for any linting configuration in the repo
find . -maxdepth 2 -type f \( -name "pyproject.toml" -o -name "setup.py" -o -name ".flake8" -o -name "setup.cfg" \) | head -10Repository: OpenMS/streamlit-template
Length of output: 51
🏁 Script executed:
# Check the ruff configuration
cat ruff.tomlRepository: OpenMS/streamlit-template
Length of output: 975
🏁 Script executed:
# Now let's verify the suggested fix works by creating a test
# Create a temporary file with the fix applied and check it
cat > /tmp/test_fix.py << 'EOF'
from pathlib import Path
from typing import Optional, TYPE_CHECKING
from .Logger import Logger
from .ParameterManager import ParameterManager
from .CommandExecutor import CommandExecutor
from .StreamlitUI import StreamlitUI
from .FileManager import FileManager
import multiprocessing
import streamlit as st
import shutil
import time
import traceback
if TYPE_CHECKING:
from .QueueManager import QueueManager
class WorkflowManager:
# Core workflow logic using the above classes
def __init__(self, name: str, workspace: str):
self.name = name
self.workflow_dir = Path(workspace, name.replace(" ", "-").lower())
self.file_manager = FileManager(self.workflow_dir)
self.logger = Logger(self.workflow_dir)
self.parameter_manager = ParameterManager(self.workflow_dir)
self.executor = CommandExecutor(self.workflow_dir, self.logger, self.parameter_manager)
self.ui = StreamlitUI(self.workflow_dir, self.logger, self.executor, self.parameter_manager)
self.params = self.parameter_manager.get_parameters_from_json()
# Initialize queue manager for online mode
self._queue_manager: Optional["QueueManager"] = None
if self._is_online_mode():
self._init_queue_manager()
EOF
# Verify it with ruff
ruff check /tmp/test_fix.py --select F821 2>&1 || echo "No F821 errors found!"Repository: OpenMS/streamlit-template
Length of output: 87
Add TYPE_CHECKING guard to resolve F821 linting error.
Ruff flags 'QueueManager' as undefined because the name is not bound in module scope, even with the string forward reference. Using TYPE_CHECKING avoids the lint error without runtime overhead.
🔧 Suggested fix
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from .QueueManager import QueueManager
...
-self._queue_manager: Optional['QueueManager'] = None
+self._queue_manager: Optional["QueueManager"] = None🧰 Tools
🪛 Ruff (0.14.11)
27-27: Undefined name QueueManager
(F821)
🤖 Prompt for AI Agents
In `@src/workflow/WorkflowManager.py` around lines 26 - 42, The lint error arises
because 'QueueManager' is referenced as a forward string but not imported at
module scope; update the module to import QueueManager inside a TYPE_CHECKING
block so the name exists for static checkers without runtime import. Add "from
typing import TYPE_CHECKING" at the top and under "if TYPE_CHECKING:" add "from
.QueueManager import QueueManager"; keep the runtime dynamic import inside
_init_queue_manager() (the existing function) and leave _is_online_mode()
unchanged.
| def _start_workflow_queued(self) -> None: | ||
| """Submit workflow to Redis queue (online mode)""" | ||
| from .tasks import execute_workflow | ||
|
|
||
| # Generate unique job ID based on workflow directory | ||
| job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}" | ||
|
|
||
| # Submit job to queue | ||
| submitted_id = self._queue_manager.submit_job( | ||
| func=execute_workflow, | ||
| kwargs={ | ||
| "workflow_dir": str(self.workflow_dir), | ||
| "workflow_class": self.__class__.__name__, | ||
| "workflow_module": self.__class__.__module__, | ||
| }, | ||
| job_id=job_id, | ||
| timeout=7200, # 2 hour timeout | ||
| description=f"Workflow: {self.name}" | ||
| ) | ||
|
|
||
| if submitted_id: | ||
| # Store job ID for status checking | ||
| self._queue_manager.store_job_id(self.workflow_dir, submitted_id) | ||
| else: | ||
| # Fallback to local execution if queue submission fails | ||
| st.warning("Queue submission failed, running locally...") | ||
| self._start_workflow_local() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use collision-resistant job IDs for queued workflows.
int(time.time()) can collide when multiple starts occur within the same second, causing enqueue failures and unexpected local fallback.
🔧 Suggested fix
+import uuid
...
- job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}"
+ job_id = f"workflow-{self.workflow_dir.name}-{uuid.uuid4().hex}"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _start_workflow_queued(self) -> None: | |
| """Submit workflow to Redis queue (online mode)""" | |
| from .tasks import execute_workflow | |
| # Generate unique job ID based on workflow directory | |
| job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}" | |
| # Submit job to queue | |
| submitted_id = self._queue_manager.submit_job( | |
| func=execute_workflow, | |
| kwargs={ | |
| "workflow_dir": str(self.workflow_dir), | |
| "workflow_class": self.__class__.__name__, | |
| "workflow_module": self.__class__.__module__, | |
| }, | |
| job_id=job_id, | |
| timeout=7200, # 2 hour timeout | |
| description=f"Workflow: {self.name}" | |
| ) | |
| if submitted_id: | |
| # Store job ID for status checking | |
| self._queue_manager.store_job_id(self.workflow_dir, submitted_id) | |
| else: | |
| # Fallback to local execution if queue submission fails | |
| st.warning("Queue submission failed, running locally...") | |
| self._start_workflow_local() | |
| def _start_workflow_queued(self) -> None: | |
| """Submit workflow to Redis queue (online mode)""" | |
| from .tasks import execute_workflow | |
| # Generate unique job ID based on workflow directory | |
| job_id = f"workflow-{self.workflow_dir.name}-{uuid.uuid4().hex}" | |
| # Submit job to queue | |
| submitted_id = self._queue_manager.submit_job( | |
| func=execute_workflow, | |
| kwargs={ | |
| "workflow_dir": str(self.workflow_dir), | |
| "workflow_class": self.__class__.__name__, | |
| "workflow_module": self.__class__.__module__, | |
| }, | |
| job_id=job_id, | |
| timeout=7200, # 2 hour timeout | |
| description=f"Workflow: {self.name}" | |
| ) | |
| if submitted_id: | |
| # Store job ID for status checking | |
| self._queue_manager.store_job_id(self.workflow_dir, submitted_id) | |
| else: | |
| # Fallback to local execution if queue submission fails | |
| st.warning("Queue submission failed, running locally...") | |
| self._start_workflow_local() |
🤖 Prompt for AI Agents
In `@src/workflow/WorkflowManager.py` around lines 55 - 81, The job_id generation
in _start_workflow_queued currently uses int(time.time()) which can collide;
replace that single-second timestamp with a collision-resistant identifier
(e.g., append or replace with uuid.uuid4().hex or use time.time_ns() combined
with a short random/UUID) when building job_id in _start_workflow_queued; ensure
you import uuid if using uuid.uuid4(), keep the existing job_id format prefix
("workflow-{self.workflow_dir.name}-...") so downstream uses (e.g.,
_queue_manager.store_job_id and submit_job) continue to find the ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request introduces a Redis-based queue system for workflow execution in online deployment mode, while maintaining backward compatibility with the existing multiprocessing approach for local mode. The implementation adds job queuing, status tracking, and queue monitoring capabilities specifically for Docker deployments.
Changes:
- Added Redis Queue (RQ) infrastructure for online mode workflow execution
- Implemented QueueManager for job submission, tracking, and cancellation
- Created worker tasks module for background workflow execution
- Added health monitoring and queue metrics display
- Updated Dockerfile to run Redis server and RQ workers alongside Streamlit
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| src/workflow/tasks.py | New worker task execution logic for RQ workers |
| src/workflow/health.py | Redis and worker health check utilities |
| src/workflow/QueueManager.py | Core queue management implementation |
| src/workflow/WorkflowManager.py | Added queue submission and status tracking |
| src/workflow/StreamlitUI.py | Queue status display and progress tracking UI |
| src/common/common.py | Queue metrics monitoring in sidebar |
| settings.json | Queue configuration settings |
| requirements.txt | Added Redis and RQ dependencies |
| Dockerfile | Redis server and RQ worker setup |
| docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md | Comprehensive implementation documentation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| workflow.logger = logger | ||
| workflow.parameter_manager = parameter_manager | ||
| workflow.executor = executor | ||
| workflow.params = params |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workflow instance created using object.new bypasses the normal init method and may miss important initialization. Consider verifying that all required workflow attributes are set before calling workflow.execution(). Specifically, check if the workflow's init sets any other attributes beyond those manually assigned here (lines 81-87), such as self.ui or other workflow-specific state.
| workflow.params = params | |
| workflow.params = params | |
| # Ensure attributes normally initialized in __init__ but not required | |
| # for headless execution are present to avoid AttributeError later. | |
| # In particular, many workflows expect `self.ui` to exist. | |
| if not hasattr(workflow, "ui"): | |
| workflow.ui = None |
| class QueueManager: | ||
| """ | ||
| Manages Redis Queue operations for workflow execution. | ||
| Only active in online mode. Falls back to direct execution in local mode. | ||
| Redis runs on localhost within the same container. | ||
| """ | ||
|
|
||
| QUEUE_NAME = "openms-workflows" | ||
| # Redis runs locally in the same container | ||
| REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0") | ||
|
|
||
| def __init__(self): | ||
| self._redis = None | ||
| self._queue = None | ||
| self._is_online = self._check_online_mode() | ||
| self._init_attempted = False | ||
|
|
||
| if self._is_online: | ||
| self._init_redis() | ||
|
|
||
| def _check_online_mode(self) -> bool: | ||
| """Check if running in online mode""" | ||
| # Check environment variable first (set in Docker) | ||
| if os.environ.get("REDIS_URL"): | ||
| return True | ||
|
|
||
| # Fallback: check settings file | ||
| try: | ||
| with open("settings.json", "r") as f: | ||
| settings = json.load(f) | ||
| return settings.get("online_deployment", False) | ||
| except Exception: | ||
| return False | ||
|
|
||
| def _init_redis(self) -> None: | ||
| """Initialize Redis connection and queue""" | ||
| if self._init_attempted: | ||
| return | ||
| self._init_attempted = True | ||
|
|
||
| try: | ||
| from redis import Redis | ||
| from rq import Queue | ||
|
|
||
| self._redis = Redis.from_url(self.REDIS_URL) | ||
| self._redis.ping() # Test connection | ||
| self._queue = Queue(self.QUEUE_NAME, connection=self._redis) | ||
| except ImportError: | ||
| # Redis/RQ packages not installed | ||
| self._redis = None | ||
| self._queue = None | ||
| except Exception: | ||
| # Redis server not available | ||
| self._redis = None | ||
| self._queue = None | ||
|
|
||
| @property | ||
| def is_available(self) -> bool: | ||
| """Check if queue system is available""" | ||
| return self._is_online and self._queue is not None | ||
|
|
||
| def submit_job( | ||
| self, | ||
| func: Callable, | ||
| args: tuple = (), | ||
| kwargs: dict = None, | ||
| job_id: Optional[str] = None, | ||
| timeout: int = 7200, # 2 hour default | ||
| result_ttl: int = 86400, # 24 hours | ||
| description: str = "" | ||
| ) -> Optional[str]: | ||
| """ | ||
| Submit a job to the queue. | ||
| Args: | ||
| func: The function to execute | ||
| args: Positional arguments for the function | ||
| kwargs: Keyword arguments for the function | ||
| job_id: Optional custom job ID (defaults to UUID) | ||
| timeout: Job timeout in seconds | ||
| result_ttl: How long to keep results | ||
| description: Human-readable job description | ||
| Returns: | ||
| Job ID if successful, None otherwise | ||
| """ | ||
| if not self.is_available: | ||
| return None | ||
|
|
||
| kwargs = kwargs or {} | ||
|
|
||
| try: | ||
| job = self._queue.enqueue( | ||
| func, | ||
| args=args, | ||
| kwargs=kwargs, | ||
| job_id=job_id, | ||
| job_timeout=timeout, | ||
| result_ttl=result_ttl, | ||
| description=description, | ||
| meta={"description": description, "progress": 0.0, "current_step": ""} | ||
| ) | ||
| return job.id | ||
| except Exception: | ||
| return None | ||
|
|
||
| def get_job_info(self, job_id: str) -> Optional[JobInfo]: | ||
| """ | ||
| Get information about a job. | ||
| Args: | ||
| job_id: The job ID to query | ||
| Returns: | ||
| JobInfo object or None if not found | ||
| """ | ||
| if not self.is_available: | ||
| return None | ||
|
|
||
| try: | ||
| from rq.job import Job | ||
|
|
||
| job = Job.fetch(job_id, connection=self._redis) | ||
|
|
||
| # Map RQ status to our enum | ||
| status_map = { | ||
| "queued": JobStatus.QUEUED, | ||
| "started": JobStatus.STARTED, | ||
| "finished": JobStatus.FINISHED, | ||
| "failed": JobStatus.FAILED, | ||
| "deferred": JobStatus.DEFERRED, | ||
| "canceled": JobStatus.CANCELED, | ||
| } | ||
|
|
||
| status = status_map.get(job.get_status(), JobStatus.QUEUED) | ||
|
|
||
| # Get progress from job meta | ||
| meta = job.meta or {} | ||
| progress = meta.get("progress", 0.0) | ||
| current_step = meta.get("current_step", "") | ||
|
|
||
| # Calculate queue position if queued | ||
| queue_position = None | ||
| queue_length = None | ||
| if status == JobStatus.QUEUED: | ||
| queue_position = self._get_job_position(job_id) | ||
| queue_length = len(self._queue) | ||
|
|
||
| return JobInfo( | ||
| job_id=job.id, | ||
| status=status, | ||
| progress=progress, | ||
| current_step=current_step, | ||
| queue_position=queue_position, | ||
| queue_length=queue_length, | ||
| result=job.result if status == JobStatus.FINISHED else None, | ||
| error=str(job.exc_info) if job.exc_info else None, | ||
| enqueued_at=str(job.enqueued_at) if job.enqueued_at else None, | ||
| started_at=str(job.started_at) if job.started_at else None, | ||
| ended_at=str(job.ended_at) if job.ended_at else None, | ||
| ) | ||
| except Exception: | ||
| return None | ||
|
|
||
| def _get_job_position(self, job_id: str) -> Optional[int]: | ||
| """Get position of a job in the queue (1-indexed)""" | ||
| try: | ||
| job_ids = self._queue.job_ids | ||
| if job_id in job_ids: | ||
| return job_ids.index(job_id) + 1 | ||
| return None | ||
| except Exception: | ||
| return None | ||
|
|
||
| def cancel_job(self, job_id: str) -> bool: | ||
| """ | ||
| Cancel a queued or running job. | ||
| Args: | ||
| job_id: The job ID to cancel | ||
| Returns: | ||
| True if successfully canceled | ||
| """ | ||
| if not self.is_available: | ||
| return False | ||
|
|
||
| try: | ||
| from rq.job import Job | ||
|
|
||
| job = Job.fetch(job_id, connection=self._redis) | ||
| job.cancel() | ||
| return True | ||
| except Exception: | ||
| return False | ||
|
|
||
| def get_queue_stats(self) -> dict: | ||
| """ | ||
| Get queue statistics. | ||
| Returns: | ||
| Dictionary with queue stats | ||
| """ | ||
| if not self.is_available: | ||
| return {} | ||
|
|
||
| try: | ||
| from rq import Worker | ||
|
|
||
| workers = Worker.all(connection=self._redis) | ||
| busy_workers = len([w for w in workers if w.get_state() == "busy"]) | ||
|
|
||
| return { | ||
| "queued": len(self._queue), | ||
| "started": len(self._queue.started_job_registry), | ||
| "finished": len(self._queue.finished_job_registry), | ||
| "failed": len(self._queue.failed_job_registry), | ||
| "workers": len(workers), | ||
| "busy_workers": busy_workers, | ||
| "idle_workers": len(workers) - busy_workers, | ||
| } | ||
| except Exception: | ||
| return {} | ||
|
|
||
| def store_job_id(self, workflow_dir: Path, job_id: str) -> None: | ||
| """Store job ID in workflow directory for recovery""" | ||
| job_file = Path(workflow_dir) / ".job_id" | ||
| job_file.write_text(job_id) | ||
|
|
||
| def load_job_id(self, workflow_dir: Path) -> Optional[str]: | ||
| """Load job ID from workflow directory""" | ||
| job_file = Path(workflow_dir) / ".job_id" | ||
| if job_file.exists(): | ||
| return job_file.read_text().strip() | ||
| return None | ||
|
|
||
| def clear_job_id(self, workflow_dir: Path) -> None: | ||
| """Clear stored job ID""" | ||
| job_file = Path(workflow_dir) / ".job_id" | ||
| if job_file.exists(): | ||
| job_file.unlink() |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new QueueManager class and Redis queue functionality lack automated test coverage. Consider adding tests to verify: queue initialization and availability detection, job submission and retrieval, job status tracking, job cancellation, queue statistics reporting, and graceful fallback to local execution when Redis is unavailable. This is particularly important for production reliability of the online deployment mode.
| import importlib | ||
| module = importlib.import_module(workflow_module) | ||
| WorkflowClass = getattr(module, workflow_class) |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workflow_class and workflow_module parameters are used with importlib and getattr to dynamically load and instantiate classes (lines 54-55). If these values are derived from user input without validation, this could potentially allow arbitrary code execution. Ensure that workflow_class and workflow_module are validated against a whitelist of allowed workflows before dynamic loading, or verify they are only set internally by the WorkflowManager.
| # Generate unique job ID based on workflow directory | ||
| job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}" |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The job_id is generated using the workflow directory name and timestamp (line 60), which could lead to collisions if the same workflow is started multiple times within the same second. Consider adding additional entropy (e.g., a random component or UUID suffix) to ensure uniqueness, or document that rapid successive starts of the same workflow are not supported.
| "workspaces_dir": ".." | ||
| "workspaces_dir": "..", | ||
| "queue_settings": { | ||
| "enabled": true, |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue_settings configuration added to settings.json includes an 'enabled' flag, but this flag is never checked in the code. The queue availability is determined by REDIS_URL environment variable and online_deployment setting, not by queue_settings.enabled. Either implement checking of this flag in QueueManager initialization or remove it to avoid confusion.
| "enabled": true, |
| def execute_workflow( | ||
| workflow_dir: str, | ||
| workflow_class: str, | ||
| workflow_module: str, | ||
| ) -> dict: | ||
| """ | ||
| Execute a workflow in the worker process. | ||
| This function is called by the RQ worker to execute a workflow. | ||
| It reconstructs the workflow object and calls its execution() method. | ||
| Args: | ||
| workflow_dir: Path to the workflow directory | ||
| workflow_class: Name of the Workflow class | ||
| workflow_module: Module path containing the Workflow class | ||
| Returns: | ||
| Dictionary with execution results | ||
| """ | ||
| try: | ||
| from rq import get_current_job | ||
| job = get_current_job() | ||
| except Exception: | ||
| job = None | ||
|
|
||
| workflow_path = Path(workflow_dir) | ||
|
|
||
| try: | ||
| # Update progress | ||
| _update_progress(job, 0.0, "Initializing workflow...") | ||
|
|
||
| # Import required modules | ||
| from src.workflow.CommandExecutor import CommandExecutor | ||
| from src.workflow.FileManager import FileManager | ||
| from src.workflow.ParameterManager import ParameterManager | ||
| from src.workflow.Logger import Logger | ||
|
|
||
| # Load the workflow class dynamically | ||
| import importlib | ||
| module = importlib.import_module(workflow_module) | ||
| WorkflowClass = getattr(module, workflow_class) | ||
|
|
||
| _update_progress(job, 0.05, "Loading parameters...") | ||
|
|
||
| # Delete the log file if it already exists | ||
| shutil.rmtree(Path(workflow_path, "logs"), ignore_errors=True) | ||
|
|
||
| # Load parameters from saved params.json | ||
| params_file = workflow_path / "params.json" | ||
| if params_file.exists(): | ||
| with open(params_file, "r") as f: | ||
| params = json.load(f) | ||
| else: | ||
| params = {} | ||
|
|
||
| # Initialize workflow components | ||
| logger = Logger(workflow_path) | ||
| file_manager = FileManager(workflow_path) | ||
| parameter_manager = ParameterManager(workflow_path) | ||
| executor = CommandExecutor(workflow_path, logger, parameter_manager) | ||
|
|
||
| _update_progress(job, 0.1, "Starting workflow execution...") | ||
|
|
||
| # Create workflow instance | ||
| # We need to bypass the normal __init__ which requires Streamlit | ||
| workflow = object.__new__(WorkflowClass) | ||
| workflow.name = workflow_path.name | ||
| workflow.workflow_dir = workflow_path | ||
| workflow.file_manager = file_manager | ||
| workflow.logger = logger | ||
| workflow.parameter_manager = parameter_manager | ||
| workflow.executor = executor | ||
| workflow.params = params | ||
|
|
||
| # Store job reference for progress updates | ||
| workflow._rq_job = job | ||
|
|
||
| # Clear results directory | ||
| results_dir = workflow_path / "results" | ||
| if results_dir.exists(): | ||
| shutil.rmtree(results_dir) | ||
| results_dir.mkdir(parents=True) | ||
|
|
||
| # Log workflow start | ||
| logger.log("STARTING WORKFLOW") | ||
|
|
||
| _update_progress(job, 0.15, "Executing workflow steps...") | ||
|
|
||
| # Execute the workflow | ||
| workflow.execution() | ||
|
|
||
| # Log workflow completion | ||
| logger.log("WORKFLOW FINISHED") | ||
|
|
||
| _update_progress(job, 1.0, "Workflow completed") | ||
|
|
||
| # Clean up pid directory (in case it was created by accident) | ||
| shutil.rmtree(executor.pid_dir, ignore_errors=True) | ||
|
|
||
| return { | ||
| "success": True, | ||
| "workflow_dir": str(workflow_path), | ||
| "message": "Workflow completed successfully" | ||
| } | ||
|
|
||
| except Exception as e: | ||
| error_msg = f"Workflow failed: {str(e)}\n{traceback.format_exc()}" | ||
|
|
||
| # Log error to workflow logs | ||
| try: | ||
| log_dir = workflow_path / "logs" | ||
| log_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| for log_name in ["minimal.log", "commands-and-run-times.log", "all.log"]: | ||
| log_file = log_dir / log_name | ||
| with open(log_file, "a") as f: | ||
| f.write(f"\n\nERROR: {str(e)}\n") | ||
| f.write(traceback.format_exc()) | ||
| except Exception: | ||
| pass | ||
|
|
||
| # Clean up pid directory | ||
| try: | ||
| pid_dir = workflow_path / "pids" | ||
| shutil.rmtree(pid_dir, ignore_errors=True) | ||
| except Exception: | ||
| pass | ||
|
|
||
| return { | ||
| "success": False, | ||
| "workflow_dir": str(workflow_path), | ||
| "error": error_msg | ||
| } | ||
|
|
||
|
|
||
| def _update_progress(job, progress: float, step: str) -> None: | ||
| """Update job progress metadata""" | ||
| if job is not None: | ||
| try: | ||
| job.meta["progress"] = progress | ||
| job.meta["current_step"] = step | ||
| job.save_meta() | ||
| except Exception: | ||
| pass # Ignore errors updating progress |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The execute_workflow function in tasks.py lacks automated test coverage. Consider adding tests to verify: workflow instance creation without Streamlit dependencies, parameter loading from params.json, progress tracking, error handling and logging, and cleanup of temporary directories. These tests should mock the RQ job context and verify the function works correctly in the worker environment.
| else: | ||
| # Fallback to local execution if queue submission fails | ||
| st.warning("Queue submission failed, running locally...") | ||
| self._start_workflow_local() |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When queue submission fails and falls back to local execution (line 81), a warning is shown but the logs are not deleted before starting the local workflow. In contrast, _start_workflow_local (line 86) deletes logs before starting. This inconsistency means that when falling back, old log files may persist and confuse users. Consider calling shutil.rmtree for logs before the fallback, or refactor to ensure consistent behavior.
| This module must be importable without Streamlit being available. | ||
| """ | ||
|
|
||
| import sys |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'sys' is not used.
| import sys |
| with open(log_file, "a") as f: | ||
| f.write(f"\n\nERROR: {str(e)}\n") | ||
| f.write(traceback.format_exc()) | ||
| except Exception: |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
| try: | ||
| pid_dir = workflow_path / "pids" | ||
| shutil.rmtree(pid_dir, ignore_errors=True) | ||
| except Exception: |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
Summary by CodeRabbit
New Features
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.