Skip to content

Latest commit

 

History

History
142 lines (101 loc) · 5.27 KB

File metadata and controls

142 lines (101 loc) · 5.27 KB

agentexec

Production-ready task orchestration for OpenAI Agents SDK

agentexec is a Python library that provides robust, scalable infrastructure for running AI agents in production environments. It combines Redis-backed task queues, multi-process worker pools, and comprehensive activity tracking to make deploying and monitoring AI agents straightforward and reliable.

Key Features

  • Multi-Process Worker Pool - Spawn N worker processes that poll a Redis queue and execute tasks in parallel
  • Redis Task Queue - Reliable job distribution with priority support (HIGH/LOW)
  • Automatic Activity Tracking - Full lifecycle management (QUEUED → RUNNING → COMPLETE/ERROR) with progress logging
  • OpenAI Agents Integration - Drop-in runner with max turns recovery and status reporting
  • Agent Self-Reporting - Built-in tools for agents to report progress during execution
  • Type-Safe Contexts - Pydantic BaseModel for task context with IDE autocomplete
  • Graceful Shutdown - Timeout-based worker shutdown with signal handling
  • Pipelines - Multi-step workflow orchestration with parallel task execution
  • Result Storage - Results cached in Redis with configurable TTL for pipeline coordination

Why agentexec?

Building production AI agent systems requires more than just calling an LLM API. You need:

  • Reliable task execution - Tasks should survive process restarts and be retried on failure
  • Observability - Know what your agents are doing, when they started, and how they're progressing
  • Scalability - Run multiple agents in parallel across multiple worker processes
  • Type safety - Catch errors at development time, not in production
  • Graceful degradation - Handle failures without bringing down your entire system

agentexec provides all of this out of the box, letting you focus on building your agents rather than infrastructure.

Quick Example

# worker.py — shared module imported by both workers and producers.
from uuid import UUID
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import create_async_engine
from agents import Agent
import agentexec as ax


class ResearchContext(BaseModel):
    company: str
    focus_areas: list[str]

class ResearchResult(BaseModel):
    summary: str
    insights: list[str]


engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
pool = ax.Pool(engine=engine)


@pool.task("research_company")
async def research_company(agent_id: UUID, context: ResearchContext) -> ResearchResult:
    runner = ax.OpenAIRunner(agent_id=agent_id)

    agent = Agent(
        name="Research Agent",
        instructions=(
            f"Research {context.company} focusing on {context.focus_areas}"
            f"{runner.prompts.report_status}"
        ),
        tools=[runner.tools.report_status],
        model="gpt-5",
        output_type=ResearchResult,
    )

    result = await runner.run(agent, input="Begin research", max_turns=15)
    return result.final_output_as(ResearchResult)

Start the workers via the CLI:

agentexec run worker:pool --create-tables

Enqueue tasks from anywhere (e.g. a FastAPI handler) — importing the worker module configures the engine so ax.enqueue() works:

from worker import ResearchContext
import agentexec as ax

task = await ax.enqueue(
    "research_company",
    ResearchContext(company="Anthropic", focus_areas=["AI safety"]),
)
result = await ax.get_result(task)

Documentation

Getting Started

Concepts

Guides

API Reference

Deployment

Community

Requirements

  • Python 3.11+
  • Redis 7.0+
  • SQLAlchemy-compatible database (PostgreSQL, MySQL, SQLite)

License

agentexec is released under the MIT License.