Skip to content

Latest commit

 

History

History
880 lines (655 loc) · 18.1 KB

File metadata and controls

880 lines (655 loc) · 18.1 KB

PyLet Python API Reference

PyLet is a distributed instance execution system that orchestrates commands across a cluster of worker nodes. Instances expose IP:port for HTTP communication.

Quick Start

import pylet

# Initialize connection to head node
pylet.init()  # Default: localhost:8000

# Submit an instance
instance = pylet.submit("echo hello", cpu=1)
instance.wait()
print(f"Exit code: {instance.exit_code}")

# Submit a service
vllm = pylet.submit(
    "vllm serve Qwen/Qwen2.5-1.5B-Instruct --port $PORT",
    name="my-vllm",
    gpu=1,
    memory=4096,
)
vllm.wait_running()
print(f"vLLM ready at: {vllm.endpoint}")

# Use the service
import httpx
response = httpx.post(
    f"http://{vllm.endpoint}/v1/completions",
    json={"model": "Qwen/Qwen2.5-1.5B-Instruct", "prompt": "Hello", "max_tokens": 10}
)

# Clean up
vllm.cancel()
vllm.wait()

Design Decisions

Decision Choice
Sync or async first? Sync. Async in pylet.aio
Context managers? No. Module state + atexit
Auto-init? No. Explicit pylet.init() required

Module: pylet

pylet.init

def init(address: str = "http://localhost:8000") -> None

Initialize connection to PyLet head node. Must be called before any other client API.

Args:

  • address: Head node URL. Default "http://localhost:8000"

Raises:

  • ConnectionError: Cannot reach head node

Example:

pylet.init()
pylet.init("http://192.168.1.10:8000")

pylet.shutdown

def shutdown() -> None

Close connection to head node. Optional - called automatically via atexit.


pylet.is_initialized

def is_initialized() -> bool

Check if init() has been called.


pylet.submit

def submit(
    command: Union[str, List[str]],
    *,
    name: Optional[str] = None,
    gpu: int = 0,
    cpu: int = 1,
    memory: int = 512,
    target_worker: Optional[str] = None,
    gpu_indices: Optional[List[int]] = None,
    exclusive: bool = True,
    labels: Optional[Dict[str, str]] = None,
    env: Optional[Dict[str, str]] = None,
    venv: Optional[str] = None,
) -> Instance

Submit a new instance.

Args:

  • command: Shell command string, or list of args (auto shell-escaped)
  • name: Optional instance name for service discovery
  • gpu: GPU units required (default 0, ignored if gpu_indices specified)
  • cpu: CPU cores required (default 1)
  • memory: Memory in MB required (default 512)
  • target_worker: Place on specific worker node
  • gpu_indices: Request specific physical GPU indices
  • exclusive: If False, GPUs don't block allocation pool (default True)
  • labels: Custom metadata dict
  • env: Environment variables to set
  • venv: Path to pre-existing virtualenv (must be absolute path)

Returns:

  • Instance handle for the submitted instance.

Raises:

  • NotInitializedError: init() not called
  • ValueError: Invalid command or resources

Example:

# Basic usage
instance = pylet.submit("echo hello", cpu=1)
instance = pylet.submit("vllm serve model --port $PORT", name="vllm", gpu=1, memory=4096)
instance = pylet.submit(["python", "-c", "print('hello')"], cpu=1)

# Target specific worker and GPU indices
instance = pylet.submit(
    "sllm-store start",
    target_worker="gpu-0",
    gpu_indices=[0, 1, 2, 3],
    exclusive=False,
    labels={"type": "sllm-store"},
)

# Use a virtualenv
instance = pylet.submit(
    "python train.py",
    venv="/home/user/my-venv",
    gpu=1,
)

# Deploy multiple instances (use a loop)
instances = []
for i in range(3):
    inst = pylet.submit(f"python worker.py", name=f"worker-{i}", gpu=1)
    instances.append(inst)

pylet.get

def get(
    name: Optional[str] = None,
    *,
    id: Optional[str] = None,
) -> Instance

Get an existing instance by name or ID.

Args:

  • name: Instance name (positional or keyword)
  • id: Instance ID (keyword only)

Returns: Instance handle

Raises:

  • NotInitializedError: init() not called
  • NotFoundError: Instance not found
  • ValueError: Neither name nor id provided

Example:

instance = pylet.get("my-vllm")
instance = pylet.get(id="abc-123-def")

pylet.instances

def instances(
    *,
    status: Optional[str] = None,
    labels: Optional[Dict[str, str]] = None,
) -> List[Instance]

List all instances.

Args:

  • status: Filter by status (e.g., "RUNNING", "PENDING")
  • labels: Filter by labels (all specified labels must match)

Returns: List of Instance handles

Raises:

  • NotInitializedError: init() not called

Example:

all_instances = pylet.instances()
running = pylet.instances(status="RUNNING")
gpu_instances = pylet.instances(labels={"type": "gpu-worker"})

pylet.workers

def workers() -> List[WorkerInfo]

List all registered workers.

Returns: List of WorkerInfo objects

Raises:

  • NotInitializedError: init() not called

pylet.delete

def delete(
    name: Optional[str] = None,
    *,
    id: Optional[str] = None,
) -> None

Delete an instance by name or ID.

Args:

  • name: Instance name (positional or keyword)
  • id: Instance ID (keyword only)

Raises:

  • NotInitializedError: init() not called
  • NotFoundError: Instance not found
  • ValueError: Neither name nor id provided

Example:

pylet.delete("my-instance")
pylet.delete(id="abc-123-def")

pylet.delete_all

def delete_all(*, status: Optional[str] = None) -> int

Delete all instances, optionally filtered by status.

Args:

  • status: Only delete instances with this status (e.g., "COMPLETED", "FAILED", "CANCELLED")

Returns: Number of instances deleted

Raises:

  • NotInitializedError: init() not called

Example:

# Delete all completed instances
count = pylet.delete_all(status="COMPLETED")
print(f"Deleted {count} instances")

# Delete all instances (use with caution!)
count = pylet.delete_all()

pylet.delete_worker

def delete_worker(worker_id: str) -> None

Delete a worker by ID. Only OFFLINE workers can be deleted.

Args:

  • worker_id: Worker ID to delete

Raises:

  • NotInitializedError: init() not called
  • NotFoundError: Worker not found
  • ValueError: Worker is not OFFLINE (only OFFLINE workers can be deleted)

Example:

pylet.delete_worker("worker-123")

pylet.delete_all_offline_workers

def delete_all_offline_workers() -> int

Delete all workers with OFFLINE status.

Returns: Number of workers deleted

Raises:

  • NotInitializedError: init() not called

Example:

count = pylet.delete_all_offline_workers()
print(f"Deleted {count} offline workers")

Class: Instance

Returned by pylet.submit() and pylet.get(). Represents a handle to an instance.

Properties

@property
def id(self) -> str

Instance UUID.

@property
def name(self) -> Optional[str]

User-provided name, or None.

@property
def status(self) -> str

Current status: "PENDING", "ASSIGNED", "RUNNING", "COMPLETED", "FAILED", "CANCELLED", "UNKNOWN".

@property
def endpoint(self) -> Optional[str]

"host:port" when running, None otherwise.

@property
def exit_code(self) -> Optional[int]

Process exit code when terminal, None otherwise.

@property
def display_status(self) -> str

User-facing status. Returns "CANCELLING" while cancellation is in progress, otherwise same as status.

@property
def gpu_indices(self) -> Optional[List[int]]

Allocated GPU indices when assigned/running, None otherwise.

@property
def exclusive(self) -> bool

Whether instance has exclusive GPU access. Default True.

@property
def labels(self) -> Dict[str, str]

User-defined labels. Returns empty dict if none set.

@property
def env(self) -> Dict[str, str]

User-defined environment variables. Returns empty dict if none set.

@property
def target_worker(self) -> Optional[str]

Target worker constraint if set, None otherwise.

Methods

Instance.wait_running

def wait_running(self, timeout: float = 300) -> None

Block until instance reaches RUNNING status.

Args:

  • timeout: Maximum seconds to wait (default 300)

Raises:

  • TimeoutError: Instance not running within timeout
  • InstanceFailedError: Instance entered FAILED or CANCELLED state

Instance.wait

def wait(self, timeout: Optional[float] = None) -> None

Block until instance reaches terminal state (COMPLETED, FAILED, CANCELLED).

Args:

  • timeout: Maximum seconds to wait, or None for no limit

Raises:

  • TimeoutError: Instance not terminal within timeout

Instance.cancel

def cancel(self, delete: bool = False) -> None

Request instance cancellation. Returns immediately (cancellation is async).

Args:

  • delete: If True, delete the instance after cancellation completes (default False)

Raises:

  • InstanceTerminatedError: Instance already in terminal state

Instance.logs

def logs(self, tail: Optional[int] = None) -> str

Get instance logs.

Args:

  • tail: If provided, return only last N bytes

Returns: Log content as string


Instance.refresh

def refresh(self) -> None

Fetch latest state from server. Updates all properties.


Instance.__repr__

def __repr__(self) -> str

Returns: Instance(id='abc', name='vllm', status='RUNNING', endpoint='192.168.1.5:15600')


Class: WorkerInfo

Returned by pylet.workers(). Read-only data object.

Properties

@property
def id(self) -> str

Worker UUID.

@property
def host(self) -> str

Worker IP address.

@property
def status(self) -> str

"ONLINE", "SUSPECT", or "OFFLINE".

@property
def gpu(self) -> int

Total GPU units.

@property
def gpu_available(self) -> int

Available GPU units.

@property
def cpu(self) -> int

Total CPU cores.

@property
def cpu_available(self) -> int

Available CPU cores.

@property
def memory(self) -> int

Total memory in MB.

@property
def memory_available(self) -> int

Available memory in MB.

@property
def gpu_indices_available(self) -> List[int]

List of available GPU indices.


Cluster Management

pylet.start

def start(
    *,
    address: Optional[str] = None,
    port: int = 8000,
    gpu: int = 0,
    cpu: int = 4,
    memory: int = 4096,
    block: bool = False,
) -> Union[Head, Worker, NoReturn]

Start head node or worker.

  • If address is None: start head node
  • If address is provided: start worker and join cluster

Args:

  • address: Head node URL. If None, start as head. If provided, start as worker.
  • port: Port for head node (only used when starting head, default 8000)
  • gpu: GPU units to offer (only used when starting worker, default 0)
  • cpu: CPU cores to offer (only used when starting worker, default 4)
  • memory: Memory in MB to offer (only used when starting worker, default 4096)
  • block: If True, run in foreground and block forever. If False, run in background thread.

Returns:

  • Head if starting head with block=False
  • Worker if starting worker with block=False
  • Does not return if block=True

Example:

# Start head in background
head = pylet.start(port=8000)
head.stop()

# Start head in foreground (blocks forever)
pylet.start(port=8000, block=True)

# Start worker in background
worker = pylet.start(address="http://head:8000", gpu=1, cpu=4)
worker.stop()

# Start worker in foreground (blocks forever)
pylet.start(address="http://head:8000", gpu=1, block=True)

pylet.local_cluster

def local_cluster(
    workers: int = 1,
    *,
    gpu_per_worker: int = 0,
    cpu_per_worker: int = 4,
    memory_per_worker: int = 4096,
    port: int = 8000,
) -> Cluster

Start a local cluster (head + workers) for testing.

Args:

  • workers: Number of workers to start (default 1)
  • gpu_per_worker: GPU units per worker (default 0)
  • cpu_per_worker: CPU cores per worker (default 4)
  • memory_per_worker: Memory in MB per worker (default 4096)
  • port: Head node port (default 8000)

Returns: Cluster context manager

Example:

with pylet.local_cluster(workers=2, gpu_per_worker=1) as cluster:
    # pylet is auto-initialized to this cluster
    instance = pylet.submit("nvidia-smi", gpu=1)
    instance.wait()

Class: Head

Returned by pylet.start() when starting head with block=False.

def stop(self) -> None

Stop the head node.

@property
def address(self) -> str

Head node URL (e.g., "http://localhost:8000").


Class: Worker

Returned by pylet.start() when starting worker with block=False.

def stop(self) -> None

Stop the worker.


Class: Cluster

Returned by pylet.local_cluster(). Context manager.

def __enter__(self) -> Cluster

Start cluster and auto-call pylet.init().

def __exit__(self, ...) -> None

Stop all workers and head, call pylet.shutdown().

def shutdown(self) -> None

Manually stop cluster.

@property
def address(self) -> str

Head node URL.


Exceptions

class PyletError(Exception)
    """Base exception for all PyLet errors."""

class NotInitializedError(PyletError)
    """pylet.init() not called."""

class NotFoundError(PyletError)
    """Instance or worker not found."""

class TimeoutError(PyletError)
    """Operation timed out."""

class InstanceFailedError(PyletError)
    """Instance entered FAILED/CANCELLED state unexpectedly."""
    instance: Instance  # The failed instance

class InstanceTerminatedError(PyletError)
    """Operation invalid on terminated instance."""

Module: pylet.aio

Async versions of all APIs. Same signatures, but async def and await.

import pylet.aio as pylet

async def main():
    await pylet.init()
    instance = await pylet.submit("echo hello", cpu=1)
    await instance.wait_running()
    await instance.cancel()
    await pylet.shutdown()

Async Functions

  • async pylet.aio.init(address: str = "http://localhost:8000") -> None
  • async pylet.aio.shutdown() -> None
  • pylet.aio.is_initialized() -> bool (sync, no I/O)
  • async pylet.aio.submit(...) -> Instance - Same parameters as sync version
  • async pylet.aio.get(...) -> Instance
  • async pylet.aio.instances(*, status: Optional[str] = None) -> List[Instance] - Note: does not support labels parameter
  • async pylet.aio.workers() -> List[WorkerInfo]
  • async pylet.aio.delete(name=None, *, id=None) -> None
  • async pylet.aio.delete_all(*, status=None) -> int
  • async pylet.aio.delete_worker(worker_id) -> None
  • async pylet.aio.delete_all_offline_workers() -> int

Async Instance Methods

  • async Instance.wait_running(timeout: float = 300) -> None
  • async Instance.wait(timeout: Optional[float] = None) -> None
  • async Instance.cancel(delete: bool = False) -> None
  • async Instance.logs(tail: Optional[int] = None) -> str
  • async Instance.refresh() -> None

Complete Example

import pylet

# Initialize
pylet.init("http://head:8000")

# Submit instance
instance = pylet.submit(
    "vllm serve Qwen/Qwen2.5-1.5B-Instruct --port $PORT",
    name="my-vllm",
    gpu=1,
    memory=4096,
)

# Wait for running
instance.wait_running()
print(f"vLLM ready at: {instance.endpoint}")

# Use it (external HTTP client)
import httpx
response = httpx.post(
    f"http://{instance.endpoint}/v1/completions",
    json={"model": "Qwen/Qwen2.5-1.5B-Instruct", "prompt": "Hello", "max_tokens": 10}
)
print(response.json())

# Get logs
print(instance.logs(tail=1000))

# Stop
instance.cancel()
instance.wait()  # Wait for cancellation to complete
print(f"Final status: {instance.status}")

Local Testing Example

import pylet

with pylet.local_cluster(workers=2, cpu_per_worker=2) as cluster:
    # Submit two instances
    i1 = pylet.submit("sleep 10", name="sleeper-1", cpu=1)
    i2 = pylet.submit("sleep 10", name="sleeper-2", cpu=1)

    # Wait for both
    i1.wait_running()
    i2.wait_running()

    print(pylet.instances(status="RUNNING"))  # [Instance(...), Instance(...)]

    # Cancel both
    i1.cancel()
    i2.cancel()

# Cluster auto-shutdown on exit

API Summary

Function Purpose
pylet.init(address) Connect to head
pylet.shutdown() Disconnect (optional)
pylet.is_initialized() Check if connected
pylet.submit(command, *, name, gpu, cpu, memory, ...) Submit instance
pylet.get(name, *, id) Get instance
pylet.instances(*, status, labels) List instances
pylet.workers() List workers
pylet.delete(name, *, id) Delete instance
pylet.delete_all(*, status) Delete all instances
pylet.delete_worker(worker_id) Delete OFFLINE worker
pylet.delete_all_offline_workers() Delete all OFFLINE workers
pylet.start(*, address, port, gpu, cpu, memory, block) Start head/worker
pylet.local_cluster(workers, *, ...) Test cluster
Instance Property Purpose
instance.id Instance UUID
instance.name User-provided name
instance.status Current status
instance.display_status User-facing status (shows CANCELLING)
instance.endpoint host:port when running
instance.exit_code Exit code when terminal
instance.gpu_indices Allocated GPU indices
instance.exclusive Exclusive GPU access
instance.labels User-defined labels
instance.env Environment variables
instance.target_worker Target worker constraint
Instance Method Purpose
instance.wait_running(timeout) Block until RUNNING
instance.wait(timeout) Block until terminal
instance.cancel(delete) Request cancellation
instance.logs(tail) Get logs
instance.refresh() Update from server