-
Notifications
You must be signed in to change notification settings - Fork 653
Cog Server Rewrite #2641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Cog Server Rewrite #2641
Conversation
|
in an effort to keep this as small as it can be (lololol) can we leave the coglet dir in the repo for now. Or if necessary, remove the go code and leave the python code? Would be nice to merge this as an addition only and then start wiring it up to the build/wheel selection system. I'd also like to salvage the python sdk portion of coglet since the complex type system and schema worked well for pipelines. |
The python section of the code is the problem as it conflicts with the new wheel. I'll get some rename I. Place instead of removal. |
Tasks for coglet development: - build:rust / build:rust:release - Build Rust crates - build:coglet - Build Python wheel with maturin (dev) - build:coglet:wheel* - Build release wheels for various platforms - test:rust - Run Rust tests with nextest - test:coglet-python - Run Python integration tests - fmt:rust / fmt:rust:check - Format Rust code - clippy - Run lints - deny - Check licenses/advisories Tools added: cargo-binstall, cargo-deny, cargo-insta, cargo-nextest, maturin, ruff, ty
Workspace structure: - crates/Cargo.toml - workspace root with shared dependencies - crates/coglet/ - main Rust library (empty scaffold) - crates/deny.toml - cargo-deny license/advisory config - crates/.gitignore - ignore build artifacts Shared workspace dependencies defined for async runtime (tokio), HTTP (axum, reqwest), serialization (serde), and error handling.
Runtime: tokio, tokio-util (codec), futures, async-trait Serialization: serde, serde_json HTTP: axum (server), reqwest + ureq (webhooks) Utils: uuid, chrono, thiserror, anyhow, dashmap, tracing Unix: nix (signal handling) Dev: insta, wiremock, tower, http-body-util
Health enum: Unknown, Starting, Ready, Busy, SetupFailed, Defunct SetupStatus enum with SetupResult for tracking setup phase VersionInfo for runtime version reporting Includes serde serialization with appropriate casing and tests.
Protocol types for parent-worker communication: - ControlRequest/Response for control channel (cancel, shutdown, ready) - SlotRequest/Response for per-slot data channel (predict, logs, output) - SlotId using UUID for unique slot identification - SlotOutcome for type-safe completion handling JsonCodec wraps LengthDelimitedCodec with serde_json serialization.
NamedSocketTransport: filesystem sockets (all Unix platforms) AbstractSocketTransport: Linux abstract namespace (no filesystem) Adds ControlRequest::Init variant for worker initialization with transport info, predictor ref, slot count, and async/train flags.
PredictionStatus enum with terminal state detection. PredictionOutput for single values or streamed chunks. Prediction struct tracks lifecycle: status, logs, outputs, timing. Completion notification via tokio Notify for async waiting. CancellationToken re-exported from tokio_util for cancel propagation.
PredictionResult aggregates output, timing, and logs. PredictionGuard provides RAII timing with cancellation support. PredictionMetrics for timing data collection. PredictionError for typed prediction failures. PredictFn/AsyncPredictFn type aliases for predictor function signatures. Also fixes PredictionOutput to use untagged serde serialization.
PermitInUse/PermitIdle/PermitPoisoned enforce valid state transitions at compile time - poisoned permits cannot become idle. PermitPool manages slot permits with RAII semantics: - Idle permits return to pool on drop - Poisoned permits are orphaned, reducing capacity AnyPermit enum for dynamic state storage.
PredictionSlot holds both together with separate concerns: - Prediction behind Mutex for concurrent updates - Permit for RAII pool return on drop Slot transitions: into_idle() returns permit, into_poisoned() orphans it. Drop handler fails non-terminal predictions if slot leaked.
WebhookEventType enum for filtering (start, output, logs, completed). WebhookConfig with throttle interval, retry settings, status codes. TraceContext for W3C distributed tracing headers. WebhookSender implements: - send(): fire-and-forget for non-terminal events with throttling - send_terminal(): async with exponential backoff retries - send_terminal_sync(): blocking version for Drop contexts (uses ureq) Bearer auth via WEBHOOK_AUTH_TOKEN env var.
PredictionState snapshot for API responses with to_response() builder. PredictionHandle for waiting, state queries, and cancellation. SyncPredictionGuard for cancel-on-drop behavior in sync predictions. DashMap provides lock-free concurrent access - no deadlock risks. Terminal status triggers webhook send via spawned task.
- input.rs: Runtime detection (Pydantic vs Coglet), URLPath download with parallel ThreadPoolExecutor, PreparedInput RAII for temp file cleanup - output.rs: Output processing using cog.json.make_encodeable() and upload_files() for base64 data URL conversion - audit.rs: TeeWriter for protecting sys.stdout/stderr with audit hooks that wrap user replacements while preserving slot routing - log_writer.rs: Full SlotLogWriter with ContextVar-based prediction routing, line buffering, setup log sender for health-check logs - cancel.rs: SIGUSR1 signal handling with CancelationException, CancelableGuard RAII for sync prediction cancellation
- predictor.rs: Complete PythonPredictor with runtime detection, async/sync support, generator handling, input preparation, output processing, and schema generation - worker_bridge.rs: Full PredictHandler with asyncio event loop, per-slot cancellation tracking, log context management, SIGUSR1 signal cancellation for sync, future.cancel() for async
- coglet.pyi: Type stubs for active(), serve(), _run_worker(), _is_cancelable() - tests/test_coglet.py: Integration tests for sync/async/generator predictors, health check, cancellation endpoints
- Setup log hook for routing setup logs via SlotLogWriter - Export audit helpers (_is_slot_log_writer, _is_tee_writer, etc.) - Export SlotLogWriter and TeeWriter classes for isinstance checks
- Use abi3-py38 feature for stable ABI compatibility - Replace Python::with_gil with Python::attach (0.27 API) - Replace py.allow_threads with py.detach (0.27 API) - Add auto-initialize feature for standalone testing
Add missing install_slot_log_writers() and install_audit_hook() calls in _run_worker() to enable stdout/stderr routing and protection. Remove module-level #![allow(dead_code)] since code is now integrated.
Replace Result<(), String> in PredictHandler::setup() with typed SetupError enum. Variants distinguish load vs setup vs internal errors, enabling future error code mapping without changing behavior.
- crates/README.md: Overall architecture, prediction flow, startup sequence, bridge protocol, directory structure - crates/coglet/README.md: Detailed coglet internals, components, health states, behaviors (shutdown, cancellation, slot poisoning) - crates/coglet-python/README.md: Python bindings details, active() flag, single async loop, stdout/stderr routing, audit hook, cancel
Production always uses orchestrator. The predict_fn/async_predict_fn path was only used in tests and never ran in production. - Remove PredictFn, AsyncPredictFn, PredictFuture types - Remove predict_via_function method - Remove legacy_pool field - Remove PredictionService::new(pool) constructor - Enforce READY requires orchestrator (silently ignores otherwise) - Update tests to not create impossible states
Introduce Orchestrator trait with register_prediction() method: - Enables mocking orchestrator in service unit tests - OrchestratorHandle implements trait, keeps cancel/shutdown as inherent methods - Service now takes Arc<dyn Orchestrator> instead of Arc<OrchestratorHandle> - Export trait from coglet crate for downstream use
- Add MockOrchestrator in both service and routes test modules - Service tests: orchestrator integration, prediction flow, capacity - HTTP routes tests: full request/response cycle with mock predictor - Fix race in predict() by checking terminal before waiting on notify - 15 new tests covering prediction lifecycle end-to-end
- into_idle() now returns Option<IdleToken> instead of panicking - into_poisoned() now returns bool instead of panicking - Invalid state transitions log errors and use debug_assert for dev builds - Callers updated to handle new return types gracefully
- stdin/stdout take() now returns OrchestratorError instead of panicking - Prediction mutex locks use try_lock_prediction() helper that: - On poison: fails the prediction, logs error, returns None - Caller removes poisoned predictions from tracking - Further data from poisoned predictions is ignored
- Add try_lock_prediction() helper that fails prediction on mutex poison - predict() returns PredictionError on mutex poison instead of panicking
- worker_bridge: init_async_loop() and PythonPredictHandler::new() return Result - input: detect_runtime() returns Result, PreparedInput uses Py<PyDict> - log_writer: mutex locks use expect() with clear messages - predictor: handle detect_runtime Result propagation Worker mutex poisoning now panics with clear message - orchestrator handles worker exit and fails predictions appropriately.
- webhook: WebhookSender::new() returns Result for HTTP client creation - webhook: Mutex locks use into_inner() to recover from poison - routes: Document timestamp unwrap safety (can't fail after UNIX_EPOCH) - server: Improve signal handler expect messages for clarity Callers handle webhook creation failure by logging and continuing without webhooks, which is acceptable graceful degradation.
- input.rs: get_item().unwrap() -> ok_or_else(PyKeyError) for missing keys - log_writer.rs: OnceLock.get().unwrap() -> ok_or_else(PyRuntimeError) Both are now proper Python exceptions rather than Rust panics.
- Add .github/workflows/rust.yaml for Rust CI (fmt, clippy, nextest) - Add mise tasks: stubs:generate, stubs:check, stubs:typecheck - Add mise tasks: ci:rust, ci:coglet for local CI simulation - Add scripts/generate_stubs.py to auto-generate coglet.pyi from module - Update coglet.pyi with all module exports (classes + internal functions) CI runs: - Pure Rust checks (fmt, clippy, tests) without Python - coglet-python checks on Python 3.9/3.12/3.13 (build, stub check, typecheck)
- Install cargo-binstall before mise so mise uses binstall for cargo tools - Add uv sync to setup Python environment before maturin build - Order: Rust -> cargo-binstall -> rust-cache -> mise -> uv sync
- Add #[allow(dead_code)] on AbstractSocketTransport.prefix (kept for debugging) - Add 'uv venv' step before maturin build in CI - Move virtualenv PATH setup to individual steps after checkout to prevent "Unable to locate executable file: tar" error during actions/checkout@v4
markphelps
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most of my comments are enum cleanup related because I just read that chapter in the Book of Rust
|
Yes the enum change was lost in translation. A number of other places I tried to ensure we had enums. I'll get an addon patch to make those conversions you highlighted unless you're interested in doing it. |
Replace boolean soup with strongly-typed enums across four areas: - PredictorKind enum (Class/StandaloneFunction) replacing 5 boolean flags in predictor.rs (is_async, is_async_gen, has_train, is_train_async, is_standalone_function) - PredictionOutcome enum (Success/Failed/Cancelled) replacing success bool and error string matching in worker.rs - HandlerMode enum (Predict/Train) replacing is_train boolean in worker_bridge.rs - SlotState enum (Idle/SyncPrediction/AsyncPrediction) replacing 4 fields with state machine in worker_bridge.rs Benefits: - Invalid states impossible at compile time - Eliminates string matching for cancellation - Clearer semantics and self-documenting code - Easier to extend with new variants Addresses PR #2641 review feedback.
* refactor: replace boolean flags with Rust enums Replace boolean soup with strongly-typed enums across four areas: - PredictorKind enum (Class/StandaloneFunction) replacing 5 boolean flags in predictor.rs (is_async, is_async_gen, has_train, is_train_async, is_standalone_function) - PredictionOutcome enum (Success/Failed/Cancelled) replacing success bool and error string matching in worker.rs - HandlerMode enum (Predict/Train) replacing is_train boolean in worker_bridge.rs - SlotState enum (Idle/SyncPrediction/AsyncPrediction) replacing 4 fields with state machine in worker_bridge.rs Benefits: - Invalid states impossible at compile time - Eliminates string matching for cancellation - Clearer semantics and self-documenting code - Easier to extend with new variants Addresses PR #2641 review feedback. * fix: apply clippy suggestions Use derive(Default) with #[default] attribute instead of manual impl. * style: apply rustfmt formatting * test: update tests to use PredictionOutcome enum Replace direct field access (.success, .error) with pattern matching on the PredictionOutcome enum. * chore: rm deadcode, simplify start_sync_prediction logic Signed-off-by: Mark Phelps <[email protected]> --------- Signed-off-by: Mark Phelps <[email protected]>
This is a complete rewrite of the cog http server moving away from python and to a rust pyo3, ABI3 wheel. The aim is to provide a significant uplift in control of how we run predictors and isolate the predictor run from the core interface used to run the predictor.