PyLet is a distributed instance execution system that orchestrates commands across a cluster of worker nodes. Instances expose IP:port for HTTP communication.
import pylet
# Initialize connection to head node
pylet.init() # Default: localhost:8000
# Submit an instance
instance = pylet.submit("echo hello", cpu=1)
instance.wait()
print(f"Exit code: {instance.exit_code}")
# Submit a service
vllm = pylet.submit(
"vllm serve Qwen/Qwen2.5-1.5B-Instruct --port $PORT",
name="my-vllm",
gpu=1,
memory=4096,
)
vllm.wait_running()
print(f"vLLM ready at: {vllm.endpoint}")
# Use the service
import httpx
response = httpx.post(
f"http://{vllm.endpoint}/v1/completions",
json={"model": "Qwen/Qwen2.5-1.5B-Instruct", "prompt": "Hello", "max_tokens": 10}
)
# Clean up
vllm.cancel()
vllm.wait()| Decision | Choice |
|---|---|
| Sync or async first? | Sync. Async in pylet.aio |
| Context managers? | No. Module state + atexit |
| Auto-init? | No. Explicit pylet.init() required |
def init(address: str = "http://localhost:8000") -> NoneInitialize connection to PyLet head node. Must be called before any other client API.
Args:
address: Head node URL. Default"http://localhost:8000"
Raises:
ConnectionError: Cannot reach head node
Example:
pylet.init()
pylet.init("http://192.168.1.10:8000")def shutdown() -> NoneClose connection to head node. Optional - called automatically via atexit.
def is_initialized() -> boolCheck if init() has been called.
def submit(
command: Union[str, List[str]],
*,
name: Optional[str] = None,
gpu: int = 0,
cpu: int = 1,
memory: int = 512,
target_worker: Optional[str] = None,
gpu_indices: Optional[List[int]] = None,
exclusive: bool = True,
labels: Optional[Dict[str, str]] = None,
env: Optional[Dict[str, str]] = None,
venv: Optional[str] = None,
) -> InstanceSubmit a new instance.
Args:
command: Shell command string, or list of args (auto shell-escaped)name: Optional instance name for service discoverygpu: GPU units required (default 0, ignored ifgpu_indicesspecified)cpu: CPU cores required (default 1)memory: Memory in MB required (default 512)target_worker: Place on specific worker nodegpu_indices: Request specific physical GPU indicesexclusive: IfFalse, GPUs don't block allocation pool (defaultTrue)labels: Custom metadata dictenv: Environment variables to setvenv: Path to pre-existing virtualenv (must be absolute path)
Returns:
Instancehandle for the submitted instance.
Raises:
NotInitializedError:init()not calledValueError: Invalid command or resources
Example:
# Basic usage
instance = pylet.submit("echo hello", cpu=1)
instance = pylet.submit("vllm serve model --port $PORT", name="vllm", gpu=1, memory=4096)
instance = pylet.submit(["python", "-c", "print('hello')"], cpu=1)
# Target specific worker and GPU indices
instance = pylet.submit(
"sllm-store start",
target_worker="gpu-0",
gpu_indices=[0, 1, 2, 3],
exclusive=False,
labels={"type": "sllm-store"},
)
# Use a virtualenv
instance = pylet.submit(
"python train.py",
venv="/home/user/my-venv",
gpu=1,
)
# Deploy multiple instances (use a loop)
instances = []
for i in range(3):
inst = pylet.submit(f"python worker.py", name=f"worker-{i}", gpu=1)
instances.append(inst)def get(
name: Optional[str] = None,
*,
id: Optional[str] = None,
) -> InstanceGet an existing instance by name or ID.
Args:
name: Instance name (positional or keyword)id: Instance ID (keyword only)
Returns: Instance handle
Raises:
NotInitializedError:init()not calledNotFoundError: Instance not foundValueError: Neithernamenoridprovided
Example:
instance = pylet.get("my-vllm")
instance = pylet.get(id="abc-123-def")def instances(
*,
status: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
) -> List[Instance]List all instances.
Args:
status: Filter by status (e.g.,"RUNNING","PENDING")labels: Filter by labels (all specified labels must match)
Returns: List of Instance handles
Raises:
NotInitializedError:init()not called
Example:
all_instances = pylet.instances()
running = pylet.instances(status="RUNNING")
gpu_instances = pylet.instances(labels={"type": "gpu-worker"})def workers() -> List[WorkerInfo]List all registered workers.
Returns: List of WorkerInfo objects
Raises:
NotInitializedError:init()not called
def delete(
name: Optional[str] = None,
*,
id: Optional[str] = None,
) -> NoneDelete an instance by name or ID.
Args:
name: Instance name (positional or keyword)id: Instance ID (keyword only)
Raises:
NotInitializedError:init()not calledNotFoundError: Instance not foundValueError: Neithernamenoridprovided
Example:
pylet.delete("my-instance")
pylet.delete(id="abc-123-def")def delete_all(*, status: Optional[str] = None) -> intDelete all instances, optionally filtered by status.
Args:
status: Only delete instances with this status (e.g.,"COMPLETED","FAILED","CANCELLED")
Returns: Number of instances deleted
Raises:
NotInitializedError:init()not called
Example:
# Delete all completed instances
count = pylet.delete_all(status="COMPLETED")
print(f"Deleted {count} instances")
# Delete all instances (use with caution!)
count = pylet.delete_all()def delete_worker(worker_id: str) -> NoneDelete a worker by ID. Only OFFLINE workers can be deleted.
Args:
worker_id: Worker ID to delete
Raises:
NotInitializedError:init()not calledNotFoundError: Worker not foundValueError: Worker is not OFFLINE (only OFFLINE workers can be deleted)
Example:
pylet.delete_worker("worker-123")def delete_all_offline_workers() -> intDelete all workers with OFFLINE status.
Returns: Number of workers deleted
Raises:
NotInitializedError:init()not called
Example:
count = pylet.delete_all_offline_workers()
print(f"Deleted {count} offline workers")Returned by pylet.submit() and pylet.get(). Represents a handle to an instance.
@property
def id(self) -> strInstance UUID.
@property
def name(self) -> Optional[str]User-provided name, or None.
@property
def status(self) -> strCurrent status: "PENDING", "ASSIGNED", "RUNNING", "COMPLETED", "FAILED", "CANCELLED", "UNKNOWN".
@property
def endpoint(self) -> Optional[str]"host:port" when running, None otherwise.
@property
def exit_code(self) -> Optional[int]Process exit code when terminal, None otherwise.
@property
def display_status(self) -> strUser-facing status. Returns "CANCELLING" while cancellation is in progress, otherwise same as status.
@property
def gpu_indices(self) -> Optional[List[int]]Allocated GPU indices when assigned/running, None otherwise.
@property
def exclusive(self) -> boolWhether instance has exclusive GPU access. Default True.
@property
def labels(self) -> Dict[str, str]User-defined labels. Returns empty dict if none set.
@property
def env(self) -> Dict[str, str]User-defined environment variables. Returns empty dict if none set.
@property
def target_worker(self) -> Optional[str]Target worker constraint if set, None otherwise.
def wait_running(self, timeout: float = 300) -> NoneBlock until instance reaches RUNNING status.
Args:
timeout: Maximum seconds to wait (default 300)
Raises:
TimeoutError: Instance not running within timeoutInstanceFailedError: Instance enteredFAILEDorCANCELLEDstate
def wait(self, timeout: Optional[float] = None) -> NoneBlock until instance reaches terminal state (COMPLETED, FAILED, CANCELLED).
Args:
timeout: Maximum seconds to wait, orNonefor no limit
Raises:
TimeoutError: Instance not terminal within timeout
def cancel(self, delete: bool = False) -> NoneRequest instance cancellation. Returns immediately (cancellation is async).
Args:
delete: IfTrue, delete the instance after cancellation completes (defaultFalse)
Raises:
InstanceTerminatedError: Instance already in terminal state
def logs(self, tail: Optional[int] = None) -> strGet instance logs.
Args:
tail: If provided, return only last N bytes
Returns: Log content as string
def refresh(self) -> NoneFetch latest state from server. Updates all properties.
def __repr__(self) -> strReturns: Instance(id='abc', name='vllm', status='RUNNING', endpoint='192.168.1.5:15600')
Returned by pylet.workers(). Read-only data object.
@property
def id(self) -> strWorker UUID.
@property
def host(self) -> strWorker IP address.
@property
def status(self) -> str"ONLINE", "SUSPECT", or "OFFLINE".
@property
def gpu(self) -> intTotal GPU units.
@property
def gpu_available(self) -> intAvailable GPU units.
@property
def cpu(self) -> intTotal CPU cores.
@property
def cpu_available(self) -> intAvailable CPU cores.
@property
def memory(self) -> intTotal memory in MB.
@property
def memory_available(self) -> intAvailable memory in MB.
@property
def gpu_indices_available(self) -> List[int]List of available GPU indices.
def start(
*,
address: Optional[str] = None,
port: int = 8000,
gpu: int = 0,
cpu: int = 4,
memory: int = 4096,
block: bool = False,
) -> Union[Head, Worker, NoReturn]Start head node or worker.
- If
addressisNone: start head node - If
addressis provided: start worker and join cluster
Args:
address: Head node URL. IfNone, start as head. If provided, start as worker.port: Port for head node (only used when starting head, default 8000)gpu: GPU units to offer (only used when starting worker, default 0)cpu: CPU cores to offer (only used when starting worker, default 4)memory: Memory in MB to offer (only used when starting worker, default 4096)block: IfTrue, run in foreground and block forever. IfFalse, run in background thread.
Returns:
Headif starting head withblock=FalseWorkerif starting worker withblock=False- Does not return if
block=True
Example:
# Start head in background
head = pylet.start(port=8000)
head.stop()
# Start head in foreground (blocks forever)
pylet.start(port=8000, block=True)
# Start worker in background
worker = pylet.start(address="http://head:8000", gpu=1, cpu=4)
worker.stop()
# Start worker in foreground (blocks forever)
pylet.start(address="http://head:8000", gpu=1, block=True)def local_cluster(
workers: int = 1,
*,
gpu_per_worker: int = 0,
cpu_per_worker: int = 4,
memory_per_worker: int = 4096,
port: int = 8000,
) -> ClusterStart a local cluster (head + workers) for testing.
Args:
workers: Number of workers to start (default 1)gpu_per_worker: GPU units per worker (default 0)cpu_per_worker: CPU cores per worker (default 4)memory_per_worker: Memory in MB per worker (default 4096)port: Head node port (default 8000)
Returns: Cluster context manager
Example:
with pylet.local_cluster(workers=2, gpu_per_worker=1) as cluster:
# pylet is auto-initialized to this cluster
instance = pylet.submit("nvidia-smi", gpu=1)
instance.wait()Returned by pylet.start() when starting head with block=False.
def stop(self) -> NoneStop the head node.
@property
def address(self) -> strHead node URL (e.g., "http://localhost:8000").
Returned by pylet.start() when starting worker with block=False.
def stop(self) -> NoneStop the worker.
Returned by pylet.local_cluster(). Context manager.
def __enter__(self) -> ClusterStart cluster and auto-call pylet.init().
def __exit__(self, ...) -> NoneStop all workers and head, call pylet.shutdown().
def shutdown(self) -> NoneManually stop cluster.
@property
def address(self) -> strHead node URL.
class PyletError(Exception)
"""Base exception for all PyLet errors."""
class NotInitializedError(PyletError)
"""pylet.init() not called."""
class NotFoundError(PyletError)
"""Instance or worker not found."""
class TimeoutError(PyletError)
"""Operation timed out."""
class InstanceFailedError(PyletError)
"""Instance entered FAILED/CANCELLED state unexpectedly."""
instance: Instance # The failed instance
class InstanceTerminatedError(PyletError)
"""Operation invalid on terminated instance."""Async versions of all APIs. Same signatures, but async def and await.
import pylet.aio as pylet
async def main():
await pylet.init()
instance = await pylet.submit("echo hello", cpu=1)
await instance.wait_running()
await instance.cancel()
await pylet.shutdown()async pylet.aio.init(address: str = "http://localhost:8000") -> Noneasync pylet.aio.shutdown() -> Nonepylet.aio.is_initialized() -> bool(sync, no I/O)async pylet.aio.submit(...) -> Instance- Same parameters as sync versionasync pylet.aio.get(...) -> Instanceasync pylet.aio.instances(*, status: Optional[str] = None) -> List[Instance]- Note: does not supportlabelsparameterasync pylet.aio.workers() -> List[WorkerInfo]async pylet.aio.delete(name=None, *, id=None) -> Noneasync pylet.aio.delete_all(*, status=None) -> intasync pylet.aio.delete_worker(worker_id) -> Noneasync pylet.aio.delete_all_offline_workers() -> int
async Instance.wait_running(timeout: float = 300) -> Noneasync Instance.wait(timeout: Optional[float] = None) -> Noneasync Instance.cancel(delete: bool = False) -> Noneasync Instance.logs(tail: Optional[int] = None) -> strasync Instance.refresh() -> None
import pylet
# Initialize
pylet.init("http://head:8000")
# Submit instance
instance = pylet.submit(
"vllm serve Qwen/Qwen2.5-1.5B-Instruct --port $PORT",
name="my-vllm",
gpu=1,
memory=4096,
)
# Wait for running
instance.wait_running()
print(f"vLLM ready at: {instance.endpoint}")
# Use it (external HTTP client)
import httpx
response = httpx.post(
f"http://{instance.endpoint}/v1/completions",
json={"model": "Qwen/Qwen2.5-1.5B-Instruct", "prompt": "Hello", "max_tokens": 10}
)
print(response.json())
# Get logs
print(instance.logs(tail=1000))
# Stop
instance.cancel()
instance.wait() # Wait for cancellation to complete
print(f"Final status: {instance.status}")import pylet
with pylet.local_cluster(workers=2, cpu_per_worker=2) as cluster:
# Submit two instances
i1 = pylet.submit("sleep 10", name="sleeper-1", cpu=1)
i2 = pylet.submit("sleep 10", name="sleeper-2", cpu=1)
# Wait for both
i1.wait_running()
i2.wait_running()
print(pylet.instances(status="RUNNING")) # [Instance(...), Instance(...)]
# Cancel both
i1.cancel()
i2.cancel()
# Cluster auto-shutdown on exit| Function | Purpose |
|---|---|
pylet.init(address) |
Connect to head |
pylet.shutdown() |
Disconnect (optional) |
pylet.is_initialized() |
Check if connected |
pylet.submit(command, *, name, gpu, cpu, memory, ...) |
Submit instance |
pylet.get(name, *, id) |
Get instance |
pylet.instances(*, status, labels) |
List instances |
pylet.workers() |
List workers |
pylet.delete(name, *, id) |
Delete instance |
pylet.delete_all(*, status) |
Delete all instances |
pylet.delete_worker(worker_id) |
Delete OFFLINE worker |
pylet.delete_all_offline_workers() |
Delete all OFFLINE workers |
pylet.start(*, address, port, gpu, cpu, memory, block) |
Start head/worker |
pylet.local_cluster(workers, *, ...) |
Test cluster |
| Instance Property | Purpose |
|---|---|
instance.id |
Instance UUID |
instance.name |
User-provided name |
instance.status |
Current status |
instance.display_status |
User-facing status (shows CANCELLING) |
instance.endpoint |
host:port when running |
instance.exit_code |
Exit code when terminal |
instance.gpu_indices |
Allocated GPU indices |
instance.exclusive |
Exclusive GPU access |
instance.labels |
User-defined labels |
instance.env |
Environment variables |
instance.target_worker |
Target worker constraint |
| Instance Method | Purpose |
|---|---|
instance.wait_running(timeout) |
Block until RUNNING |
instance.wait(timeout) |
Block until terminal |
instance.cancel(delete) |
Request cancellation |
instance.logs(tail) |
Get logs |
instance.refresh() |
Update from server |