Skip to content

Conversation

@t0mdavid-m
Copy link
Member

@t0mdavid-m t0mdavid-m commented Jan 16, 2026

Summary by CodeRabbit

  • New Features

    • Introduced background job queue system enabling asynchronous workflow execution with real-time progress tracking
    • Added queue status monitoring in the sidebar displaying worker activity, queued jobs, and utilization metrics
    • Enhanced execution UI with live job status updates, queue position visibility, and detailed progress information
  • Documentation

    • Added comprehensive implementation guide for queue system architecture and operations

✏️ Tip: You can customize this high-level summary in your review settings.

This document outlines the architecture and implementation steps for
introducing a Redis-based job queueing system to online deployments:

- Uses RQ (Redis Queue) for task management
- Docker Compose additions for Redis, worker, and dashboard services
- New QueueManager class for queue interactions
- Worker tasks module for background execution
- Health monitoring utilities
- Backward compatible with local mode (multiprocessing fallback)
Update implementation plan to run Redis server and RQ worker within the
same Docker container as the Streamlit app:

- Redis server runs as background process via entrypoint script
- RQ worker runs in same container with identical environment
- No docker-compose orchestration complexity
- All communication via localhost
- Optional supervisord config for robust process management
- Simplified deployment and debugging
…etails

- Add "Design Principles" section explaining plug & play architecture
- Document offline mode (Windows installer) compatibility - zero changes needed
- Add comprehensive "Configuring Worker Count" section with multiple methods
- Add "User Experience: Queue Status Display" section showing what users see
- Add "Sidebar Metrics" section for queue monitoring alongside CPU/RAM
- Include code examples for all UI components
New files:
- src/workflow/QueueManager.py: Redis queue interaction layer
- src/workflow/tasks.py: Worker task definitions for RQ
- src/workflow/health.py: Queue health check utilities

Modified files:
- Dockerfile: Install Redis server, add entrypoint for Redis/RQ workers
- requirements.txt: Add redis and rq packages
- settings.json: Add queue_settings configuration
- src/workflow/WorkflowManager.py: Add queue support with local fallback
- src/workflow/StreamlitUI.py: Add queue status display in execution section
- src/common/common.py: Add sidebar queue metrics for online mode

Features:
- Automatic queue mode in online deployment, local mode unchanged
- Queue position and progress display when workflows are queued
- Sidebar metrics showing worker utilization and queue depth
- Configurable worker count via RQ_WORKER_COUNT env variable
- Graceful fallback to multiprocessing if Redis unavailable
@coderabbitai
Copy link

coderabbitai bot commented Jan 16, 2026

📝 Walkthrough

Walkthrough

Introduces a Redis-backed job queue system (via RQ) for online workflow execution. Adds Docker setup with Redis and RQ workers, Python modules for queue management and health checks, and integrates queue status monitoring into the Streamlit UI and workflow orchestration layer. Configuration and dependency files updated accordingly.

Changes

Cohort / File(s) Summary
Docker & Runtime Setup
Dockerfile
Installs Redis and RQ dependencies; adds /app/entrypoint.sh script that initializes cron, Redis, multiple RQ workers (configurable via RQ_WORKER_COUNT), and Streamlit. Introduces REDIS_URL and RQ_WORKER_COUNT environment variables.
Documentation
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md
Comprehensive design and implementation plan for Redis queue integration, covering architecture, phased rollout, testing, monitoring, configuration, and operational considerations for online workflow execution.
Dependencies & Configuration
requirements.txt, settings.json
Adds redis and rq packages to requirements. Introduces queue_settings object in settings.json with enabled flag, default timeout, and result TTL configuration.
Queue System Core
src/workflow/QueueManager.py, src/workflow/tasks.py, src/workflow/health.py
New modules providing queue abstraction (QueueManager with job submission, status tracking, and cancellation), worker-side task execution (execute_workflow), and health monitoring (Redis and worker health checks, queue metrics).
Workflow Integration
src/workflow/WorkflowManager.py, src/workflow/StreamlitUI.py
WorkflowManager extended to detect online mode, route execution to queue or local fallback, and expose get_workflow_status and stop_workflow methods. StreamlitUI execution_section signature updated to accept status and stop callbacks; adds queue status display with progress tracking.
UI Monitoring
src/common/common.py
Adds monitor_queue fragment to display queue metrics (worker count, queued jobs, utilization) and high-queue-depth warnings in sidebar Resource Utilization section when online mode is enabled.

Sequence Diagrams

sequenceDiagram
    participant UI as Streamlit UI
    participant WM as WorkflowManager
    participant QM as QueueManager
    participant Redis
    participant RQWorker as RQ Worker
    
    UI->>WM: start_workflow()
    WM->>WM: detect online mode
    alt Online Mode Available
        WM->>QM: submit_job(execute_workflow, ...)
        QM->>Redis: enqueue job to queue
        Redis-->>QM: job_id
        QM->>QM: store_job_id(workflow_dir, job_id)
        QM-->>WM: job_id
        WM-->>UI: job submitted
        
        par Status Polling
            UI->>WM: get_workflow_status()
            WM->>QM: get_job_info(job_id)
            QM->>Redis: retrieve job metadata
            Redis-->>QM: job status, progress, queue_position
            QM-->>WM: JobInfo
            WM-->>UI: status {progress, queue_position, ...}
        and Worker Execution
            RQWorker->>Redis: pop job from queue
            RQWorker->>RQWorker: execute_workflow(workflow_dir, class, module)
            RQWorker->>RQWorker: _update_progress(job, progress, step)
            RQWorker->>Redis: store job metadata (progress, current_step)
        end
    else Fallback to Local
        WM->>WM: _start_workflow_local()
        WM-->>UI: workflow running locally
    end
Loading
sequenceDiagram
    participant UI as Streamlit UI
    participant WM as WorkflowManager
    participant QM as QueueManager
    participant Health as health module
    participant Redis
    
    UI->>UI: display sidebar Resource Utilization
    UI->>Health: get_queue_metrics()
    Health->>Redis: connect via REDIS_URL
    Redis-->>Health: ping response, worker info, job counts
    Health-->>UI: {available, total_workers, busy_workers, idle_workers, queued_jobs}
    
    UI->>UI: render monitor_queue fragment
    UI->>UI: display workers, queued count, utilization bar
    alt high queue depth
        UI->>UI: show warning
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 A queue of jobs, in Redis so bright,
Workers hopping left and right,
No more blocking, tasks take flight,
Progress tracked with pure delight! 🚀

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Redis queue system for online mode' accurately summarizes the main change: introducing Redis-based job queuing for online workflows with comprehensive infrastructure integration.
Docstring Coverage ✅ Passed Docstring coverage is 86.49% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🤖 Fix all issues with AI agents
In `@Dockerfile`:
- Around line 166-173: The Redis readiness loop (the `until redis-cli ping`
section) can hang indefinitely; modify it to include a maximum retry count or
timeout by introducing a counter and MAX_RETRIES (or a TIMEOUT) and incrementing
the counter each iteration, breaking with a non-zero exit and an error message
if the max is reached; ensure you still echo progress messages and exit
successfully when `redis-cli ping` responds.
- Around line 152-155: The Dockerfile sets ENV REDIS_URL but the entrypoint
script still hardcodes "redis://localhost:6379/0", making the variable
ineffective; update the entrypoint invocation (the script or ENTRYPOINT that
launches the RQ worker) to use the REDIS_URL environment variable (e.g.,
substitute the literal URL with $REDIS_URL or ${REDIS_URL}) so the ENV REDIS_URL
defined alongside ENV RQ_WORKER_COUNT is honored at runtime.
- Around line 175-180: The startup script starts background RQ workers using
WORKER_COUNT (env RQ_WORKER_COUNT) and the command pattern "rq worker
openms-workflows --url redis://localhost:6379/0 --name worker-$i" but lacks
signal handling for graceful shutdown; add a trap for SIGTERM/SIGINT that
forwards these signals to the child worker PIDs, wait for them to exit, and on
timeout send TERM/QUIT/INT as appropriate—capture and store each background PID
when launching workers, implement a cleanup function (invoked by trap) that
iterates over those PIDs to kill/terminate gracefully and then wait for
completion, and ensure the script exits with the workers' exit status.

In `@requirements.txt`:
- Around line 140-143: The requirements.txt currently mixes pip-compile
generated content with manual additions (the lines for redis>=5.0.0 and
rq>=1.16.0) and lacks the expected pip-compile metadata; fix by either creating
a pyproject.toml that declares redis and rq and then regenerate requirements.txt
via pip-compile (so the added entries get proper “# via …” annotations), or keep
the manual additions and update the top-of-file header to explicitly state that
these two entries were manually appended (and add a short inline comment after
each dependency to indicate they are manual additions) so the file format and
provenance are consistent; refer to the dependency names "redis" and "rq" and
the requirements.txt header when making the change.

In `@settings.json`:
- Around line 19-24: The settings.json includes a unused queue_settings object;
either remove it or wire its enabled flag into the queue initialization logic.
Update the QueueManager initialization (constructor/init method in QueueManager)
to read settings.queue_settings?.enabled and treat false as "do not initialize
Redis" before the existing online_deployment and REDIS_URL checks, or remove
queue_settings from settings.json if you prefer to keep the current behavior;
ensure any references to queue initialization still use the final decision and
document that queue_settings.enabled controls queue activation.

In `@src/common/common.py`:
- Around line 43-85: Replace the silent blanket except in monitor_queue with a
debug-level log that preserves exception details: catch Exception as e and call
logging.getLogger(__name__).debug(...) (include a short message like
"monitor_queue: failed to fetch queue metrics" and pass exc_info=True or the
exception) so failures are recorded for diagnostics but not shown in the UI;
ensure logging is imported at top of the module and reference the monitor_queue
function and the get_queue_metrics call in the log message for context.

In `@src/workflow/WorkflowManager.py`:
- Around line 55-81: The job_id generation in _start_workflow_queued currently
uses int(time.time()) which can collide; replace that single-second timestamp
with a collision-resistant identifier (e.g., append or replace with
uuid.uuid4().hex or use time.time_ns() combined with a short random/UUID) when
building job_id in _start_workflow_queued; ensure you import uuid if using
uuid.uuid4(), keep the existing job_id format prefix
("workflow-{self.workflow_dir.name}-...") so downstream uses (e.g.,
_queue_manager.store_job_id and submit_job) continue to find the ID.
- Around line 26-42: The lint error arises because 'QueueManager' is referenced
as a forward string but not imported at module scope; update the module to
import QueueManager inside a TYPE_CHECKING block so the name exists for static
checkers without runtime import. Add "from typing import TYPE_CHECKING" at the
top and under "if TYPE_CHECKING:" add "from .QueueManager import QueueManager";
keep the runtime dynamic import inside _init_queue_manager() (the existing
function) and leave _is_online_mode() unchanged.
🧹 Nitpick comments (7)
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md (2)

17-31: Add language specifiers to fenced code blocks.

Several ASCII diagram blocks lack language specifiers, which can affect rendering and linting. Consider adding text or plaintext as the language identifier for diagram blocks.

This applies to multiple blocks throughout the document (lines 17, 65, 100, 204, 1023, 1256, 1270, 1364).

Example fix
-```
+```text
 ┌─────────────────────────────────────────────────────────────────┐
 │                      WorkflowManager                             │

247-250: Consider non-root user for supervisord processes.

The supervisord configuration runs as user=root. For improved security posture in production deployments, consider running processes under a dedicated non-root user. This is especially relevant if the container processes untrusted workflow data.

src/workflow/tasks.py (2)

92-96: Destructive results directory cleanup without backup.

Deleting the entire results directory before execution could cause unexpected data loss if a user accidentally restarts a workflow. Consider archiving previous results or adding a confirmation mechanism.


120-147: Broad exception handling is appropriate but could benefit from more specific logging.

The static analysis flags about blind Exception catching are less concerning here since this is a top-level task handler that must catch all errors. However, the silent except: pass blocks (lines 133-134, 140-141) could hide important debugging information.

Consider using logging.debug() or similar to capture these secondary failures without affecting the main error flow.

Example improvement for cleanup logging
         except Exception:
-            pass
+            pass  # Intentionally silent - don't mask original error

Or with actual logging:

import logging
logger = logging.getLogger(__name__)
# ...
except Exception as cleanup_err:
    logger.debug(f"Cleanup failed: {cleanup_err}")
src/workflow/health.py (1)

92-128: Expose queue-metrics failures for debugging.
Returning only {"available": False} loses the cause. Consider including the exception message to help troubleshoot without breaking callers.

🔧 Suggested fix
-    except Exception:
-        return {"available": False}
+    except Exception as e:
+        return {"available": False, "error": str(e)}
src/workflow/StreamlitUI.py (1)

1088-1131: Remove unused msg_type mapping.
The tuple’s second value isn’t used; simplifying avoids lint noise.

♻️ Suggested cleanup
-        status_display = {
-            "queued": ("Queued", "info"),
-            "started": ("Running", "info"),
-            "finished": ("Completed", "success"),
-            "failed": ("Failed", "error"),
-            "canceled": ("Cancelled", "warning"),
-        }
-
-        label, msg_type = status_display.get(job_status, ("Unknown", "info"))
+        status_display = {
+            "queued": "Queued",
+            "started": "Running",
+            "finished": "Completed",
+            "failed": "Failed",
+            "canceled": "Cancelled",
+        }
+
+        label = status_display.get(job_status, "Unknown")
src/workflow/QueueManager.py (1)

55-104: Allow Redis init to recover after transient startup failures.
_init_attempted prevents retry; if Redis isn’t ready at first init (common in Docker), the queue may stay unavailable for the life of the instance. Consider lazy re-init in is_available (optionally with backoff).

🔧 Suggested fix
-        self._init_attempted = False
-
-        if self._is_online:
-            self._init_redis()
+        if self._is_online:
+            self._init_redis()
...
-        if self._init_attempted:
-            return
-        self._init_attempted = True
-
         try:
             from redis import Redis
             from rq import Queue
...
     `@property`
     def is_available(self) -> bool:
         """Check if queue system is available"""
-        return self._is_online and self._queue is not None
+        if self._is_online and self._queue is None:
+            self._init_redis()
+        return self._is_online and self._queue is not None
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8ab28ad and a54bcde.

📒 Files selected for processing (10)
  • Dockerfile
  • docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md
  • requirements.txt
  • settings.json
  • src/common/common.py
  • src/workflow/QueueManager.py
  • src/workflow/StreamlitUI.py
  • src/workflow/WorkflowManager.py
  • src/workflow/health.py
  • src/workflow/tasks.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-03-10T11:09:56.467Z
Learnt from: subCode321
Repo: OpenMS/streamlit-template PR: 160
File: content/run_example_workflow.py:83-88
Timestamp: 2025-03-10T11:09:56.467Z
Learning: In the OpenMS streamlit-template project, continuous real-time monitoring of CPU stats and workflow jobs is implemented using a while True loop, which is considered acceptable for this specific use case as confirmed by the maintainer.

Applied to files:

  • src/common/common.py
  • src/workflow/StreamlitUI.py
📚 Learning: 2025-03-10T11:09:56.468Z
Learnt from: subCode321
Repo: OpenMS/streamlit-template PR: 160
File: content/run_example_workflow.py:83-88
Timestamp: 2025-03-10T11:09:56.468Z
Learning: In the OpenMS streamlit-template project, continuous real-time monitoring of CPU stats and workflow jobs is implemented using a while True loop with a time.sleep(1) in the monitor_cpu_ram_stats function, which is considered acceptable for this specific use case as confirmed by the maintainer.

Applied to files:

  • src/common/common.py
🧬 Code graph analysis (3)
src/common/common.py (1)
src/workflow/health.py (1)
  • get_queue_metrics (92-129)
src/workflow/StreamlitUI.py (1)
src/workflow/CommandExecutor.py (1)
  • stop (248-262)
src/workflow/tasks.py (5)
src/workflow/CommandExecutor.py (1)
  • CommandExecutor (13-319)
src/workflow/FileManager.py (1)
  • FileManager (7-179)
src/workflow/ParameterManager.py (1)
  • ParameterManager (7-141)
src/workflow/Logger.py (2)
  • Logger (3-42)
  • log (16-42)
src/workflow/WorkflowManager.py (1)
  • execution (259-266)
🪛 LanguageTool
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md

[style] ~1607-~1607: This adverb was used twice in the sentence. Consider removing one of them or replacing them with a synonym.
Context: ...ctions correctly - [ ] Logs are written correctly from worker - [ ] Multiple concurrent j...

(ADVERB_REPETITION_PREMIUM)

🪛 markdownlint-cli2 (0.18.1)
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md

17-17: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


65-65: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


100-100: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


204-204: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


1023-1023: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


1256-1256: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


1270-1270: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


1364-1364: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🪛 Ruff (0.14.11)
src/common/common.py

83-84: try-except-pass detected, consider logging the exception

(S110)


83-83: Do not catch blind exception: Exception

(BLE001)

src/workflow/WorkflowManager.py

27-27: Undefined name QueueManager

(F821)

src/workflow/StreamlitUI.py

1101-1101: Unpacked variable msg_type is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)

src/workflow/tasks.py

37-37: Do not catch blind exception: Exception

(BLE001)


120-120: Do not catch blind exception: Exception

(BLE001)


121-121: Use explicit conversion flag

Replace with conversion flag

(RUF010)


131-131: Use explicit conversion flag

Replace with conversion flag

(RUF010)


133-134: try-except-pass detected, consider logging the exception

(S110)


133-133: Do not catch blind exception: Exception

(BLE001)


140-141: try-except-pass detected, consider logging the exception

(S110)


140-140: Do not catch blind exception: Exception

(BLE001)


157-158: try-except-pass detected, consider logging the exception

(S110)


157-157: Do not catch blind exception: Exception

(BLE001)

src/workflow/QueueManager.py

75-75: Do not catch blind exception: Exception

(BLE001)


95-95: Do not catch blind exception: Exception

(BLE001)


109-109: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


146-146: Consider moving this statement to an else block

(TRY300)


147-147: Do not catch blind exception: Exception

(BLE001)


205-205: Do not catch blind exception: Exception

(BLE001)


214-214: Consider moving this statement to an else block

(TRY300)


215-215: Do not catch blind exception: Exception

(BLE001)


236-236: Consider moving this statement to an else block

(TRY300)


237-237: Do not catch blind exception: Exception

(BLE001)


265-265: Do not catch blind exception: Exception

(BLE001)

src/workflow/health.py

38-38: Do not catch blind exception: Exception

(BLE001)


85-85: Do not catch blind exception: Exception

(BLE001)


128-128: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: build-simple-app
  • GitHub Check: build-full-app
  • GitHub Check: build-openms
🔇 Additional comments (19)
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md (1)

1-9: Well-structured implementation plan.

The document provides comprehensive guidance covering architecture, migration steps, rollback procedures, and testing checklists. The dual-mode design (online/offline) with graceful fallback is well thought out.

Dockerfile (2)

121-128: Redis installation looks good.

The Redis server installation with --no-install-recommends and proper data directory setup with correct ownership is well done.


166-166: Redis persistence disabled despite plan documentation.

The entrypoint uses --appendonly no but the implementation plan (line 175 in docs) recommends --appendonly yes for job persistence across container restarts. If persistence is intentionally disabled, consider documenting this decision.

src/workflow/tasks.py (3)

1-6: Clear module documentation.

The docstring correctly notes that this module must be importable without Streamlit, which is crucial for RQ workers.


150-158: Progress update helper is well-designed.

The defensive approach with if job is not None and the try-except ensures progress updates never break workflow execution. This is appropriate for a non-critical feature.


78-90: Missing ui attribute concern is invalid for worker execution context.

The manual workflow construction omits the self.ui attribute from WorkflowManager.__init__, but this is not a problem. The worker execution path only calls workflow.execution(), which uses only self.params, self.logger, self.file_manager, and self.executor—all of which are properly set. The ui attribute is only needed for Streamlit UI methods (upload(), configure(), results()), which are not called in the worker context. The construction is appropriate and safe for this use case.

Likely an incorrect or invalid review comment.

src/common/common.py (1)

433-435: Queue metrics gated to online mode looks good.

src/workflow/health.py (2)

11-43: Redis health check output is well-structured.


45-89: Worker health aggregation looks solid.

src/workflow/WorkflowManager.py (3)

112-169: Queue-aware status reporting is clean and comprehensive.


171-212: Stop workflow flow looks consistent with local and queued modes.


229-233: Execution-section wiring to new callbacks looks correct.

src/workflow/StreamlitUI.py (1)

989-1087: Execution section integration with queue status looks solid.

src/workflow/QueueManager.py (6)

105-149: Job submission API is clear and well-scoped.


150-207: Job status mapping and metadata extraction look good.


208-217: Queue position helper is straightforward.


218-239: Cancel flow is clean and minimal.


240-267: Queue stats aggregation is solid.


268-284: Job ID persistence helpers are simple and effective.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +152 to +155
# Set default worker count (can be overridden via environment variable)
ENV RQ_WORKER_COUNT=1
ENV REDIS_URL=redis://localhost:6379/0

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

REDIS_URL environment variable not used consistently.

REDIS_URL is set here but the entrypoint script (line 179) hardcodes redis://localhost:6379/0 instead of using $REDIS_URL. This makes the environment variable ineffective.

Proposed fix
-    rq worker openms-workflows --url redis://localhost:6379/0 --name worker-$i &\n\
+    rq worker openms-workflows --url $REDIS_URL --name worker-$i &\n\
🤖 Prompt for AI Agents
In `@Dockerfile` around lines 152 - 155, The Dockerfile sets ENV REDIS_URL but the
entrypoint script still hardcodes "redis://localhost:6379/0", making the
variable ineffective; update the entrypoint invocation (the script or ENTRYPOINT
that launches the RQ worker) to use the REDIS_URL environment variable (e.g.,
substitute the literal URL with $REDIS_URL or ${REDIS_URL}) so the ENV REDIS_URL
defined alongside ENV RQ_WORKER_COUNT is honored at runtime.

Comment on lines +166 to +173
redis-server --daemonize yes --dir /var/lib/redis --appendonly no\n\
\n\
# Wait for Redis to be ready\n\
until redis-cli ping > /dev/null 2>&1; do\n\
echo "Waiting for Redis..."\n\
sleep 1\n\
done\n\
echo "Redis is ready"\n\
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add timeout to Redis readiness check to prevent infinite loop.

The until loop waiting for Redis has no timeout, which could hang indefinitely if Redis fails to start. Consider adding a maximum retry count or timeout.

Proposed fix
 # Wait for Redis to be ready\n\
+RETRY=0\n\
+MAX_RETRIES=30\n\
 until redis-cli ping > /dev/null 2>&1; do\n\
+    RETRY=$((RETRY+1))\n\
+    if [ $RETRY -ge $MAX_RETRIES ]; then\n\
+        echo "Redis failed to start after $MAX_RETRIES attempts"\n\
+        exit 1\n\
+    fi\n\
     echo "Waiting for Redis..."\n\
     sleep 1\n\
 done\n\
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
redis-server --daemonize yes --dir /var/lib/redis --appendonly no\n\
\n\
# Wait for Redis to be ready\n\
until redis-cli ping > /dev/null 2>&1; do\n\
echo "Waiting for Redis..."\n\
sleep 1\n\
done\n\
echo "Redis is ready"\n\
redis-server --daemonize yes --dir /var/lib/redis --appendonly no\n\
\n\
# Wait for Redis to be ready\n\
RETRY=0\n\
MAX_RETRIES=30\n\
until redis-cli ping > /dev/null 2>&1; do\n\
RETRY=$((RETRY+1))\n\
if [ $RETRY -ge $MAX_RETRIES ]; then\n\
echo "Redis failed to start after $MAX_RETRIES attempts"\n\
exit 1\n\
fi\n\
echo "Waiting for Redis..."\n\
sleep 1\n\
done\n\
echo "Redis is ready"\n\
🤖 Prompt for AI Agents
In `@Dockerfile` around lines 166 - 173, The Redis readiness loop (the `until
redis-cli ping` section) can hang indefinitely; modify it to include a maximum
retry count or timeout by introducing a counter and MAX_RETRIES (or a TIMEOUT)
and incrementing the counter each iteration, breaking with a non-zero exit and
an error message if the max is reached; ensure you still echo progress messages
and exit successfully when `redis-cli ping` responds.

Comment on lines +175 to +180
# Start RQ worker(s) in background\n\
WORKER_COUNT=${RQ_WORKER_COUNT:-1}\n\
echo "Starting $WORKER_COUNT RQ worker(s)..."\n\
for i in $(seq 1 $WORKER_COUNT); do\n\
rq worker openms-workflows --url redis://localhost:6379/0 --name worker-$i &\n\
done\n\
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Background RQ workers lack signal handling for graceful shutdown.

RQ workers are started with & but there's no trap for signals to ensure graceful shutdown when the container stops. This could lead to interrupted jobs and data loss.

Proposed fix
+# Trap signals for graceful shutdown\n\
+cleanup() {\n\
+    echo "Shutting down..."\n\
+    pkill -TERM rq\n\
+    redis-cli shutdown nosave\n\
+    exit 0\n\
+}\n\
+trap cleanup SIGTERM SIGINT\n\
+\n\
 # Start RQ worker(s) in background\n\
 WORKER_COUNT=${RQ_WORKER_COUNT:-1}\n\
🤖 Prompt for AI Agents
In `@Dockerfile` around lines 175 - 180, The startup script starts background RQ
workers using WORKER_COUNT (env RQ_WORKER_COUNT) and the command pattern "rq
worker openms-workflows --url redis://localhost:6379/0 --name worker-$i" but
lacks signal handling for graceful shutdown; add a trap for SIGTERM/SIGINT that
forwards these signals to the child worker PIDs, wait for them to exit, and on
timeout send TERM/QUIT/INT as appropriate—capture and store each background PID
when launching workers, implement a cleanup function (invoked by trap) that
iterates over those PIDs to kill/terminate gracefully and then wait for
completion, and ensure the script exits with the workers' exit status.

Comment on lines +140 to +143

# Redis Queue dependencies (for online mode)
redis>=5.0.0
rq>=1.16.0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's check the requirements.txt file header and structure
head -20 requirements.txt

Repository: OpenMS/streamlit-template

Length of output: 479


🏁 Script executed:

# Check if pyproject.toml exists and its content
if [ -f "pyproject.toml" ]; then
  echo "=== pyproject.toml exists ==="
  cat pyproject.toml
else
  echo "=== pyproject.toml does not exist ==="
fi

Repository: OpenMS/streamlit-template

Length of output: 106


🏁 Script executed:

# Check the exact lines mentioned in the review (140-143)
sed -n '135,145p' requirements.txt

Repository: OpenMS/streamlit-template

Length of output: 219


🏁 Script executed:

# Search for redis and rq references in the codebase to understand their usage
rg "redis|rq" --type-list | head -20

Repository: OpenMS/streamlit-template

Length of output: 458


🌐 Web query:

latest version of rq Python Redis Queue and when it was released

💡 Result:

Latest rq (Python Redis Queue) version: 2.6.1 — released/ uploaded to PyPI on November 22, 2025. [1][2]

Sources:

  • PyPI project page for rq (release files / upload date). [1]
  • Safety DB package entry (lists v2.6.1, Nov 22, 2025). [2]

Manual additions to pip-compile generated file need reconciliation.

The file header indicates autogenerated content via pip-compile --output-file=requirements.txt pyproject.toml, but pyproject.toml does not exist in this repository. The manual redis>=5.0.0 and rq>=1.16.0 additions at the end also lack the dependency annotation comments (# via ...) present in other dependencies, breaking the expected format.

To resolve: Either create pyproject.toml with these dependencies and regenerate via pip-compile, or update the file header to document the manual additions. The version constraints themselves are appropriate.

🤖 Prompt for AI Agents
In `@requirements.txt` around lines 140 - 143, The requirements.txt currently
mixes pip-compile generated content with manual additions (the lines for
redis>=5.0.0 and rq>=1.16.0) and lacks the expected pip-compile metadata; fix by
either creating a pyproject.toml that declares redis and rq and then regenerate
requirements.txt via pip-compile (so the added entries get proper “# via …”
annotations), or keep the manual additions and update the top-of-file header to
explicitly state that these two entries were manually appended (and add a short
inline comment after each dependency to indicate they are manual additions) so
the file format and provenance are consistent; refer to the dependency names
"redis" and "rq" and the requirements.txt header when making the change.

Comment on lines +19 to +24
"workspaces_dir": "..",
"queue_settings": {
"enabled": true,
"default_timeout": 7200,
"result_ttl": 86400
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

rg -n "queue_settings" --type py

Repository: OpenMS/streamlit-template

Length of output: 51


🏁 Script executed:

rg -n "enabled" --type py | head -50

Repository: OpenMS/streamlit-template

Length of output: 1532


🏁 Script executed:

find . -name "QueueManager.py" -o -name "*queue*manager*.py" 2>/dev/null | head -20

Repository: OpenMS/streamlit-template

Length of output: 99


🏁 Script executed:

rg -n "online_deployment" --type py

Repository: OpenMS/streamlit-template

Length of output: 559


🏁 Script executed:

cat -n src/workflow/QueueManager.py | head -100

Repository: OpenMS/streamlit-template

Length of output: 3631


🏁 Script executed:

rg -n "queue_settings" . --type json

Repository: OpenMS/streamlit-template

Length of output: 111


🏁 Script executed:

cat -n src/workflow/QueueManager.py | tail -50

Repository: OpenMS/streamlit-template

Length of output: 2068


🏁 Script executed:

cat -n settings.json

Repository: OpenMS/streamlit-template

Length of output: 851


🏁 Script executed:

rg -n "queue_settings\." . --type py

Repository: OpenMS/streamlit-template

Length of output: 51


Remove queue_settings from settings.json or ensure it's actually used in code.

The queue_settings.enabled flag has no effect. QueueManager only checks the online_deployment flag and REDIS_URL environment variable to determine whether to initialize Redis; it never consults queue_settings.enabled or any other fields in the queue_settings object. Having this configuration present but unused creates confusion about what controls queue behavior.

Either integrate queue_settings.enabled into the queue initialization logic, or remove the unused configuration.

🤖 Prompt for AI Agents
In `@settings.json` around lines 19 - 24, The settings.json includes a unused
queue_settings object; either remove it or wire its enabled flag into the queue
initialization logic. Update the QueueManager initialization (constructor/init
method in QueueManager) to read settings.queue_settings?.enabled and treat false
as "do not initialize Redis" before the existing online_deployment and REDIS_URL
checks, or remove queue_settings from settings.json if you prefer to keep the
current behavior; ensure any references to queue initialization still use the
final decision and document that queue_settings.enabled controls queue
activation.

Comment on lines +43 to +85
@st.fragment(run_every=5)
def monitor_queue():
"""Display queue metrics in sidebar (online mode only)"""
try:
from src.workflow.health import get_queue_metrics

metrics = get_queue_metrics()
if not metrics.get("available", False):
return

st.markdown("---")
st.markdown("**Queue Status**")

total_workers = metrics.get("total_workers", 0)
busy_workers = metrics.get("busy_workers", 0)
queued_jobs = metrics.get("queued_jobs", 0)

col1, col2 = st.columns(2)
col1.metric(
"Workers",
f"{busy_workers}/{total_workers}",
help="Busy workers / Total workers"
)
col2.metric(
"Queued",
queued_jobs,
help="Jobs waiting in queue"
)

# Utilization progress bar
if total_workers > 0:
utilization = busy_workers / total_workers
st.progress(utilization, text=f"{int(utilization * 100)}% utilized")

# Warning if queue is backing up
if queued_jobs > total_workers * 2 and total_workers > 0:
st.warning(f"High queue depth: {queued_jobs} jobs waiting")

st.caption(f"Last fetched at: {time.strftime('%H:%M:%S')}")

except Exception:
pass # Silently fail if queue not available

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don’t swallow queue-monitoring exceptions silently.
Silent except Exception: pass makes queue outages invisible and hard to debug. Consider logging at debug level to keep the UI clean while preserving diagnostics.

🔧 Suggested fix
+import logging
+
+LOGGER = logging.getLogger(__name__)
...
-    except Exception:
-        pass  # Silently fail if queue not available
+    except Exception as exc:
+        LOGGER.debug("Queue metrics unavailable", exc_info=exc)
🧰 Tools
🪛 Ruff (0.14.11)

83-84: try-except-pass detected, consider logging the exception

(S110)


83-83: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@src/common/common.py` around lines 43 - 85, Replace the silent blanket except
in monitor_queue with a debug-level log that preserves exception details: catch
Exception as e and call logging.getLogger(__name__).debug(...) (include a short
message like "monitor_queue: failed to fetch queue metrics" and pass
exc_info=True or the exception) so failures are recorded for diagnostics but not
shown in the UI; ensure logging is imported at top of the module and reference
the monitor_queue function and the get_queue_metrics call in the log message for
context.

Comment on lines +26 to +42
# Initialize queue manager for online mode
self._queue_manager: Optional['QueueManager'] = None
if self._is_online_mode():
self._init_queue_manager()

def _is_online_mode(self) -> bool:
"""Check if running in online deployment mode"""
return st.session_state.get("settings", {}).get("online_deployment", False)

def _init_queue_manager(self) -> None:
"""Initialize queue manager for online mode"""
try:
from .QueueManager import QueueManager
self._queue_manager = QueueManager()
except ImportError:
pass # Queue not available, will use fallback

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's check the current state of the WorkflowManager.py file
head -50 src/workflow/WorkflowManager.py

Repository: OpenMS/streamlit-template

Length of output: 2026


🏁 Script executed:

# Check imports section
head -30 src/workflow/WorkflowManager.py | cat -n

Repository: OpenMS/streamlit-template

Length of output: 1472


🏁 Script executed:

# Find ruff configuration
find . -name "pyproject.toml" -o -name "ruff.toml" -o -name ".ruff.toml" | head -5

Repository: OpenMS/streamlit-template

Length of output: 80


🏁 Script executed:

# Check if ruff is available and run it on the WorkflowManager.py file
if command -v ruff &> /dev/null; then
  ruff check src/workflow/WorkflowManager.py --select F821 2>&1 || true
else
  echo "ruff not available"
fi

Repository: OpenMS/streamlit-template

Length of output: 574


🏁 Script executed:

# Check Python version and if we can run a type checker simulation
# Let's look for any linting configuration in the repo
find . -maxdepth 2 -type f \( -name "pyproject.toml" -o -name "setup.py" -o -name ".flake8" -o -name "setup.cfg" \) | head -10

Repository: OpenMS/streamlit-template

Length of output: 51


🏁 Script executed:

# Check the ruff configuration
cat ruff.toml

Repository: OpenMS/streamlit-template

Length of output: 975


🏁 Script executed:

# Now let's verify the suggested fix works by creating a test
# Create a temporary file with the fix applied and check it
cat > /tmp/test_fix.py << 'EOF'
from pathlib import Path
from typing import Optional, TYPE_CHECKING
from .Logger import Logger
from .ParameterManager import ParameterManager
from .CommandExecutor import CommandExecutor
from .StreamlitUI import StreamlitUI
from .FileManager import FileManager
import multiprocessing
import streamlit as st
import shutil
import time
import traceback

if TYPE_CHECKING:
    from .QueueManager import QueueManager

class WorkflowManager:
    # Core workflow logic using the above classes
    def __init__(self, name: str, workspace: str):
        self.name = name
        self.workflow_dir = Path(workspace, name.replace(" ", "-").lower())
        self.file_manager = FileManager(self.workflow_dir)
        self.logger = Logger(self.workflow_dir)
        self.parameter_manager = ParameterManager(self.workflow_dir)
        self.executor = CommandExecutor(self.workflow_dir, self.logger, self.parameter_manager)
        self.ui = StreamlitUI(self.workflow_dir, self.logger, self.executor, self.parameter_manager)
        self.params = self.parameter_manager.get_parameters_from_json()

        # Initialize queue manager for online mode
        self._queue_manager: Optional["QueueManager"] = None
        if self._is_online_mode():
            self._init_queue_manager()
EOF

# Verify it with ruff
ruff check /tmp/test_fix.py --select F821 2>&1 || echo "No F821 errors found!"

Repository: OpenMS/streamlit-template

Length of output: 87


Add TYPE_CHECKING guard to resolve F821 linting error.
Ruff flags 'QueueManager' as undefined because the name is not bound in module scope, even with the string forward reference. Using TYPE_CHECKING avoids the lint error without runtime overhead.

🔧 Suggested fix
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from .QueueManager import QueueManager
...
-self._queue_manager: Optional['QueueManager'] = None
+self._queue_manager: Optional["QueueManager"] = None
🧰 Tools
🪛 Ruff (0.14.11)

27-27: Undefined name QueueManager

(F821)

🤖 Prompt for AI Agents
In `@src/workflow/WorkflowManager.py` around lines 26 - 42, The lint error arises
because 'QueueManager' is referenced as a forward string but not imported at
module scope; update the module to import QueueManager inside a TYPE_CHECKING
block so the name exists for static checkers without runtime import. Add "from
typing import TYPE_CHECKING" at the top and under "if TYPE_CHECKING:" add "from
.QueueManager import QueueManager"; keep the runtime dynamic import inside
_init_queue_manager() (the existing function) and leave _is_online_mode()
unchanged.

Comment on lines +55 to +81
def _start_workflow_queued(self) -> None:
"""Submit workflow to Redis queue (online mode)"""
from .tasks import execute_workflow

# Generate unique job ID based on workflow directory
job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}"

# Submit job to queue
submitted_id = self._queue_manager.submit_job(
func=execute_workflow,
kwargs={
"workflow_dir": str(self.workflow_dir),
"workflow_class": self.__class__.__name__,
"workflow_module": self.__class__.__module__,
},
job_id=job_id,
timeout=7200, # 2 hour timeout
description=f"Workflow: {self.name}"
)

if submitted_id:
# Store job ID for status checking
self._queue_manager.store_job_id(self.workflow_dir, submitted_id)
else:
# Fallback to local execution if queue submission fails
st.warning("Queue submission failed, running locally...")
self._start_workflow_local()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use collision-resistant job IDs for queued workflows.
int(time.time()) can collide when multiple starts occur within the same second, causing enqueue failures and unexpected local fallback.

🔧 Suggested fix
+import uuid
...
-        job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}"
+        job_id = f"workflow-{self.workflow_dir.name}-{uuid.uuid4().hex}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _start_workflow_queued(self) -> None:
"""Submit workflow to Redis queue (online mode)"""
from .tasks import execute_workflow
# Generate unique job ID based on workflow directory
job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}"
# Submit job to queue
submitted_id = self._queue_manager.submit_job(
func=execute_workflow,
kwargs={
"workflow_dir": str(self.workflow_dir),
"workflow_class": self.__class__.__name__,
"workflow_module": self.__class__.__module__,
},
job_id=job_id,
timeout=7200, # 2 hour timeout
description=f"Workflow: {self.name}"
)
if submitted_id:
# Store job ID for status checking
self._queue_manager.store_job_id(self.workflow_dir, submitted_id)
else:
# Fallback to local execution if queue submission fails
st.warning("Queue submission failed, running locally...")
self._start_workflow_local()
def _start_workflow_queued(self) -> None:
"""Submit workflow to Redis queue (online mode)"""
from .tasks import execute_workflow
# Generate unique job ID based on workflow directory
job_id = f"workflow-{self.workflow_dir.name}-{uuid.uuid4().hex}"
# Submit job to queue
submitted_id = self._queue_manager.submit_job(
func=execute_workflow,
kwargs={
"workflow_dir": str(self.workflow_dir),
"workflow_class": self.__class__.__name__,
"workflow_module": self.__class__.__module__,
},
job_id=job_id,
timeout=7200, # 2 hour timeout
description=f"Workflow: {self.name}"
)
if submitted_id:
# Store job ID for status checking
self._queue_manager.store_job_id(self.workflow_dir, submitted_id)
else:
# Fallback to local execution if queue submission fails
st.warning("Queue submission failed, running locally...")
self._start_workflow_local()
🤖 Prompt for AI Agents
In `@src/workflow/WorkflowManager.py` around lines 55 - 81, The job_id generation
in _start_workflow_queued currently uses int(time.time()) which can collide;
replace that single-second timestamp with a collision-resistant identifier
(e.g., append or replace with uuid.uuid4().hex or use time.time_ns() combined
with a short random/UUID) when building job_id in _start_workflow_queued; ensure
you import uuid if using uuid.uuid4(), keep the existing job_id format prefix
("workflow-{self.workflow_dir.name}-...") so downstream uses (e.g.,
_queue_manager.store_job_id and submit_job) continue to find the ID.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request introduces a Redis-based queue system for workflow execution in online deployment mode, while maintaining backward compatibility with the existing multiprocessing approach for local mode. The implementation adds job queuing, status tracking, and queue monitoring capabilities specifically for Docker deployments.

Changes:

  • Added Redis Queue (RQ) infrastructure for online mode workflow execution
  • Implemented QueueManager for job submission, tracking, and cancellation
  • Created worker tasks module for background workflow execution
  • Added health monitoring and queue metrics display
  • Updated Dockerfile to run Redis server and RQ workers alongside Streamlit

Reviewed changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated 17 comments.

Show a summary per file
File Description
src/workflow/tasks.py New worker task execution logic for RQ workers
src/workflow/health.py Redis and worker health check utilities
src/workflow/QueueManager.py Core queue management implementation
src/workflow/WorkflowManager.py Added queue submission and status tracking
src/workflow/StreamlitUI.py Queue status display and progress tracking UI
src/common/common.py Queue metrics monitoring in sidebar
settings.json Queue configuration settings
requirements.txt Added Redis and RQ dependencies
Dockerfile Redis server and RQ worker setup
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md Comprehensive implementation documentation

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

workflow.logger = logger
workflow.parameter_manager = parameter_manager
workflow.executor = executor
workflow.params = params
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow instance created using object.new bypasses the normal init method and may miss important initialization. Consider verifying that all required workflow attributes are set before calling workflow.execution(). Specifically, check if the workflow's init sets any other attributes beyond those manually assigned here (lines 81-87), such as self.ui or other workflow-specific state.

Suggested change
workflow.params = params
workflow.params = params
# Ensure attributes normally initialized in __init__ but not required
# for headless execution are present to avoid AttributeError later.
# In particular, many workflows expect `self.ui` to exist.
if not hasattr(workflow, "ui"):
workflow.ui = None

Copilot uses AI. Check for mistakes.
Comment on lines +43 to +284
class QueueManager:
"""
Manages Redis Queue operations for workflow execution.
Only active in online mode. Falls back to direct execution in local mode.
Redis runs on localhost within the same container.
"""

QUEUE_NAME = "openms-workflows"
# Redis runs locally in the same container
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0")

def __init__(self):
self._redis = None
self._queue = None
self._is_online = self._check_online_mode()
self._init_attempted = False

if self._is_online:
self._init_redis()

def _check_online_mode(self) -> bool:
"""Check if running in online mode"""
# Check environment variable first (set in Docker)
if os.environ.get("REDIS_URL"):
return True

# Fallback: check settings file
try:
with open("settings.json", "r") as f:
settings = json.load(f)
return settings.get("online_deployment", False)
except Exception:
return False

def _init_redis(self) -> None:
"""Initialize Redis connection and queue"""
if self._init_attempted:
return
self._init_attempted = True

try:
from redis import Redis
from rq import Queue

self._redis = Redis.from_url(self.REDIS_URL)
self._redis.ping() # Test connection
self._queue = Queue(self.QUEUE_NAME, connection=self._redis)
except ImportError:
# Redis/RQ packages not installed
self._redis = None
self._queue = None
except Exception:
# Redis server not available
self._redis = None
self._queue = None

@property
def is_available(self) -> bool:
"""Check if queue system is available"""
return self._is_online and self._queue is not None

def submit_job(
self,
func: Callable,
args: tuple = (),
kwargs: dict = None,
job_id: Optional[str] = None,
timeout: int = 7200, # 2 hour default
result_ttl: int = 86400, # 24 hours
description: str = ""
) -> Optional[str]:
"""
Submit a job to the queue.
Args:
func: The function to execute
args: Positional arguments for the function
kwargs: Keyword arguments for the function
job_id: Optional custom job ID (defaults to UUID)
timeout: Job timeout in seconds
result_ttl: How long to keep results
description: Human-readable job description
Returns:
Job ID if successful, None otherwise
"""
if not self.is_available:
return None

kwargs = kwargs or {}

try:
job = self._queue.enqueue(
func,
args=args,
kwargs=kwargs,
job_id=job_id,
job_timeout=timeout,
result_ttl=result_ttl,
description=description,
meta={"description": description, "progress": 0.0, "current_step": ""}
)
return job.id
except Exception:
return None

def get_job_info(self, job_id: str) -> Optional[JobInfo]:
"""
Get information about a job.
Args:
job_id: The job ID to query
Returns:
JobInfo object or None if not found
"""
if not self.is_available:
return None

try:
from rq.job import Job

job = Job.fetch(job_id, connection=self._redis)

# Map RQ status to our enum
status_map = {
"queued": JobStatus.QUEUED,
"started": JobStatus.STARTED,
"finished": JobStatus.FINISHED,
"failed": JobStatus.FAILED,
"deferred": JobStatus.DEFERRED,
"canceled": JobStatus.CANCELED,
}

status = status_map.get(job.get_status(), JobStatus.QUEUED)

# Get progress from job meta
meta = job.meta or {}
progress = meta.get("progress", 0.0)
current_step = meta.get("current_step", "")

# Calculate queue position if queued
queue_position = None
queue_length = None
if status == JobStatus.QUEUED:
queue_position = self._get_job_position(job_id)
queue_length = len(self._queue)

return JobInfo(
job_id=job.id,
status=status,
progress=progress,
current_step=current_step,
queue_position=queue_position,
queue_length=queue_length,
result=job.result if status == JobStatus.FINISHED else None,
error=str(job.exc_info) if job.exc_info else None,
enqueued_at=str(job.enqueued_at) if job.enqueued_at else None,
started_at=str(job.started_at) if job.started_at else None,
ended_at=str(job.ended_at) if job.ended_at else None,
)
except Exception:
return None

def _get_job_position(self, job_id: str) -> Optional[int]:
"""Get position of a job in the queue (1-indexed)"""
try:
job_ids = self._queue.job_ids
if job_id in job_ids:
return job_ids.index(job_id) + 1
return None
except Exception:
return None

def cancel_job(self, job_id: str) -> bool:
"""
Cancel a queued or running job.
Args:
job_id: The job ID to cancel
Returns:
True if successfully canceled
"""
if not self.is_available:
return False

try:
from rq.job import Job

job = Job.fetch(job_id, connection=self._redis)
job.cancel()
return True
except Exception:
return False

def get_queue_stats(self) -> dict:
"""
Get queue statistics.
Returns:
Dictionary with queue stats
"""
if not self.is_available:
return {}

try:
from rq import Worker

workers = Worker.all(connection=self._redis)
busy_workers = len([w for w in workers if w.get_state() == "busy"])

return {
"queued": len(self._queue),
"started": len(self._queue.started_job_registry),
"finished": len(self._queue.finished_job_registry),
"failed": len(self._queue.failed_job_registry),
"workers": len(workers),
"busy_workers": busy_workers,
"idle_workers": len(workers) - busy_workers,
}
except Exception:
return {}

def store_job_id(self, workflow_dir: Path, job_id: str) -> None:
"""Store job ID in workflow directory for recovery"""
job_file = Path(workflow_dir) / ".job_id"
job_file.write_text(job_id)

def load_job_id(self, workflow_dir: Path) -> Optional[str]:
"""Load job ID from workflow directory"""
job_file = Path(workflow_dir) / ".job_id"
if job_file.exists():
return job_file.read_text().strip()
return None

def clear_job_id(self, workflow_dir: Path) -> None:
"""Clear stored job ID"""
job_file = Path(workflow_dir) / ".job_id"
if job_file.exists():
job_file.unlink()
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new QueueManager class and Redis queue functionality lack automated test coverage. Consider adding tests to verify: queue initialization and availability detection, job submission and retrieval, job status tracking, job cancellation, queue statistics reporting, and graceful fallback to local execution when Redis is unavailable. This is particularly important for production reliability of the online deployment mode.

Copilot uses AI. Check for mistakes.
Comment on lines +53 to +55
import importlib
module = importlib.import_module(workflow_module)
WorkflowClass = getattr(module, workflow_class)
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow_class and workflow_module parameters are used with importlib and getattr to dynamically load and instantiate classes (lines 54-55). If these values are derived from user input without validation, this could potentially allow arbitrary code execution. Ensure that workflow_class and workflow_module are validated against a whitelist of allowed workflows before dynamic loading, or verify they are only set internally by the WorkflowManager.

Copilot uses AI. Check for mistakes.
Comment on lines +59 to +60
# Generate unique job ID based on workflow directory
job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}"
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job_id is generated using the workflow directory name and timestamp (line 60), which could lead to collisions if the same workflow is started multiple times within the same second. Consider adding additional entropy (e.g., a random component or UUID suffix) to ensure uniqueness, or document that rapid successive starts of the same workflow are not supported.

Copilot uses AI. Check for mistakes.
"workspaces_dir": ".."
"workspaces_dir": "..",
"queue_settings": {
"enabled": true,
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue_settings configuration added to settings.json includes an 'enabled' flag, but this flag is never checked in the code. The queue availability is determined by REDIS_URL environment variable and online_deployment setting, not by queue_settings.enabled. Either implement checking of this flag in QueueManager initialization or remove it to avoid confusion.

Suggested change
"enabled": true,

Copilot uses AI. Check for mistakes.
Comment on lines +15 to +158
def execute_workflow(
workflow_dir: str,
workflow_class: str,
workflow_module: str,
) -> dict:
"""
Execute a workflow in the worker process.
This function is called by the RQ worker to execute a workflow.
It reconstructs the workflow object and calls its execution() method.
Args:
workflow_dir: Path to the workflow directory
workflow_class: Name of the Workflow class
workflow_module: Module path containing the Workflow class
Returns:
Dictionary with execution results
"""
try:
from rq import get_current_job
job = get_current_job()
except Exception:
job = None

workflow_path = Path(workflow_dir)

try:
# Update progress
_update_progress(job, 0.0, "Initializing workflow...")

# Import required modules
from src.workflow.CommandExecutor import CommandExecutor
from src.workflow.FileManager import FileManager
from src.workflow.ParameterManager import ParameterManager
from src.workflow.Logger import Logger

# Load the workflow class dynamically
import importlib
module = importlib.import_module(workflow_module)
WorkflowClass = getattr(module, workflow_class)

_update_progress(job, 0.05, "Loading parameters...")

# Delete the log file if it already exists
shutil.rmtree(Path(workflow_path, "logs"), ignore_errors=True)

# Load parameters from saved params.json
params_file = workflow_path / "params.json"
if params_file.exists():
with open(params_file, "r") as f:
params = json.load(f)
else:
params = {}

# Initialize workflow components
logger = Logger(workflow_path)
file_manager = FileManager(workflow_path)
parameter_manager = ParameterManager(workflow_path)
executor = CommandExecutor(workflow_path, logger, parameter_manager)

_update_progress(job, 0.1, "Starting workflow execution...")

# Create workflow instance
# We need to bypass the normal __init__ which requires Streamlit
workflow = object.__new__(WorkflowClass)
workflow.name = workflow_path.name
workflow.workflow_dir = workflow_path
workflow.file_manager = file_manager
workflow.logger = logger
workflow.parameter_manager = parameter_manager
workflow.executor = executor
workflow.params = params

# Store job reference for progress updates
workflow._rq_job = job

# Clear results directory
results_dir = workflow_path / "results"
if results_dir.exists():
shutil.rmtree(results_dir)
results_dir.mkdir(parents=True)

# Log workflow start
logger.log("STARTING WORKFLOW")

_update_progress(job, 0.15, "Executing workflow steps...")

# Execute the workflow
workflow.execution()

# Log workflow completion
logger.log("WORKFLOW FINISHED")

_update_progress(job, 1.0, "Workflow completed")

# Clean up pid directory (in case it was created by accident)
shutil.rmtree(executor.pid_dir, ignore_errors=True)

return {
"success": True,
"workflow_dir": str(workflow_path),
"message": "Workflow completed successfully"
}

except Exception as e:
error_msg = f"Workflow failed: {str(e)}\n{traceback.format_exc()}"

# Log error to workflow logs
try:
log_dir = workflow_path / "logs"
log_dir.mkdir(parents=True, exist_ok=True)

for log_name in ["minimal.log", "commands-and-run-times.log", "all.log"]:
log_file = log_dir / log_name
with open(log_file, "a") as f:
f.write(f"\n\nERROR: {str(e)}\n")
f.write(traceback.format_exc())
except Exception:
pass

# Clean up pid directory
try:
pid_dir = workflow_path / "pids"
shutil.rmtree(pid_dir, ignore_errors=True)
except Exception:
pass

return {
"success": False,
"workflow_dir": str(workflow_path),
"error": error_msg
}


def _update_progress(job, progress: float, step: str) -> None:
"""Update job progress metadata"""
if job is not None:
try:
job.meta["progress"] = progress
job.meta["current_step"] = step
job.save_meta()
except Exception:
pass # Ignore errors updating progress
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The execute_workflow function in tasks.py lacks automated test coverage. Consider adding tests to verify: workflow instance creation without Streamlit dependencies, parameter loading from params.json, progress tracking, error handling and logging, and cleanup of temporary directories. These tests should mock the RQ job context and verify the function works correctly in the worker environment.

Copilot uses AI. Check for mistakes.
Comment on lines +78 to +81
else:
# Fallback to local execution if queue submission fails
st.warning("Queue submission failed, running locally...")
self._start_workflow_local()
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When queue submission fails and falls back to local execution (line 81), a warning is shown but the logs are not deleted before starting the local workflow. In contrast, _start_workflow_local (line 86) deletes logs before starting. This inconsistency means that when falling back, old log files may persist and confuse users. Consider calling shutil.rmtree for logs before the fallback, or refactor to ensure consistent behavior.

Copilot uses AI. Check for mistakes.
This module must be importable without Streamlit being available.
"""

import sys
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'sys' is not used.

Suggested change
import sys

Copilot uses AI. Check for mistakes.
with open(log_file, "a") as f:
f.write(f"\n\nERROR: {str(e)}\n")
f.write(traceback.format_exc())
except Exception:
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Copilot uses AI. Check for mistakes.
try:
pid_dir = workflow_path / "pids"
shutil.rmtree(pid_dir, ignore_errors=True)
except Exception:
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants