Skip to content

Add WebSocket support#1034

Open
Kludex wants to merge 3 commits into
mainfrom
websocket-support
Open

Add WebSocket support#1034
Kludex wants to merge 3 commits into
mainfrom
websocket-support

Conversation

@Kludex

@Kludex Kludex commented Jun 12, 2026

Copy link
Copy Markdown
Member

Closes #105.

Adds native WebSocket support to httpx2, based on httpx-ws by @frankie567 (MIT), vendored into an isolated httpx2/_websockets package and then migrated from wsproto to the websockets sans-IO protocol.

API

with httpx2.websocket("wss://example.com/ws") as ws:
    ws.send_text("Hello!")
    message = ws.receive_text()

with httpx2.Client() as client:
    with client.websocket("wss://example.com/ws") as ws:
        ...

async with httpx2.AsyncClient() as client:
    async with client.websocket("wss://example.com/ws") as ws:
        await ws.send_text("Hello!")
        message = await ws.receive_text()
  • ws:///wss:// URLs are accepted and mapped to http(s) for the transport.
  • Sessions expose send_text/send_bytes/send_json, receive/receive_text/receive_bytes/receive_json, ping and close, with a background keepalive ping. receive() returns str | bytes - the sans-IO library doesn't leak into the public API.
  • ASGIWebSocketTransport allows testing WebSocket endpoints of ASGI apps without a server.
  • New exceptions: WebSocketException (base), WebSocketDisconnect, WebSocketUpgradeError, WebSocketNetworkError, WebSocketInvalidTypeReceived.
  • websockets>=15 becomes a dependency of httpx2.

Notes

  • AsyncClient.websocket() manages background tasks with an anyio task group, so exceptions escaping the async with block arrive wrapped in an ExceptionGroup (use except*).
  • Test servers must not use uvicorn's ws="auto": with websockets installed it selects the deprecated websockets.legacy implementation, whose import-time DeprecationWarning is an error under filterwarnings = ["error"] and silently kills the server thread.
  • Docs are not included yet; follow-up.

AI Disclaimer

This PR was developed with the assistance of either Claude or Codex. I've reviewed and verified the changes.

Review in cubic

Kludex added 2 commits June 12, 2026 10:34
… AsyncClient.websocket()

Vendor httpx-ws (MIT, by @frankie567) into an isolated httpx2/_websockets package,
exposing WebSocketSession, AsyncWebSocketSession, ASGIWebSocketTransport and the
WebSocket exception hierarchy from the top-level httpx2 namespace.
Replace the wsproto state machine with the websockets sans-IO Protocol in both
the sessions and the ASGI transport. The raw event-based send()/receive() API
is replaced by a message-level API: receive() now returns str | bytes, keeping
the sans-IO library out of the public surface.

Also set ws="none" on the HTTP test server: with websockets installed,
uvicorn's ws="auto" selects the deprecated websockets.legacy implementation,
whose import-time DeprecationWarning is an error under filterwarnings=error
and silently kills the server thread.
@github-actions

Copy link
Copy Markdown

Docs preview:

@codspeed-hq

codspeed-hq Bot commented Jun 12, 2026

Copy link
Copy Markdown

Merging this PR will not alter performance

✅ 15 untouched benchmarks
⏩ 7 skipped benchmarks1


Comparing websocket-support (890df71) with main (82b9e2d)

Open in CodSpeed

Footnotes

  1. 7 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: d37fd914d9

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +767 to +768
if response.status_code != 101:
raise WebSocketUpgradeError(response)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Validate the WebSocket accept key before opening

When a peer returns HTTP 101 with a missing or incorrect Sec-WebSocket-Accept header (for example, a misconfigured proxy or non-WebSocket endpoint), this accepts the upgrade solely on the status code and starts a session over a stream that never completed the WebSocket handshake. The client generates a key for the request, so connect_ws/aconnect_ws should verify the response Upgrade/Connection headers and accept key before yielding the session.

Useful? React with 👍 / 👎.

Comment on lines +351 to +353
message = self._assembler.feed(frame)
if message is not None:
self._events.put(message)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid blocking forever when the sync queue is full

If the server sends more than queue_size messages and the caller exits the context without draining them, the background thread can block forever in this put(). __exit__ then calls close() and immediately join(), but closing the stream does not unblock a thread already stuck on the full Python queue, so the context manager can hang indefinitely for bursty servers or small queue sizes.

Useful? React with 👍 / 👎.

Comment on lines +478 to +480
@contextlib.asynccontextmanager
async def __asynccontextmanager__(self) -> AsyncGenerator[AsyncWebSocketSession]:
self._send_event, self._receive_event = anyio.create_memory_object_stream[str | bytes | WebSocketException]()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor queue_size for async WebSocket sessions

anyio.create_memory_object_stream() defaults to an unbuffered stream, so the async path ignores the requested queue_size and effectively has no receive queue. With AsyncClient.websocket(...), a server that sends messages while user code is not already awaiting receive() will cause the background receive task to stop reading immediately, contrary to the documented/default 512-message buffer and potentially causing avoidable backpressure or keepalive failures.

Useful? React with 👍 / 👎.

Comment on lines +259 to +261
subprotocols: list[str] = []
if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
subprotocols = subprotocols_header.split(",")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Strip spaces when parsing offered subprotocols

For ASGI tests/apps using multiple subprotocols, the client sends a comma+space separated header, but this split leaves leading spaces on every value after the first. An app checking scope["subprotocols"] for the exact second protocol (for example "v2" from subprotocols=["v1", "v2"]) will see " v2" instead and fail negotiation even though the client offered it.

Useful? React with 👍 / 👎.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 issues found across 16 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="src/httpx2/httpx2/_websockets/_ping.py">

<violation number="1" location="src/httpx2/httpx2/_websockets/_ping.py:20">
P1: `ack()` can raise `KeyError` on unmatched PONG payloads, which can crash the background receive loop.</violation>
</file>

<file name="src/httpx2/httpx2/_websockets/_session.py">

<violation number="1" location="src/httpx2/httpx2/_websockets/_session.py:353">
P2: `self._events.put(message)` blocks indefinitely when the queue is full. If the caller exits the context manager without draining all messages, `__exit__` calls `close()` then `join()`, but closing the stream does not unblock a thread stuck in `put()`. This can cause the context manager to hang. Consider using `put` with a timeout in a loop that checks `_should_close`, or use `queue.Queue.shutdown()` (Python 3.13+) / `put_nowait` with overflow handling.</violation>

<violation number="2" location="src/httpx2/httpx2/_websockets/_session.py:480">
P2: Async WebSocket sessions ignore `queue_size`, creating a zero-buffer channel that can block the background receiver and violate documented buffering behavior.</violation>
</file>

<file name="src/httpx2/httpx2/_websockets/_transport.py">

<violation number="1" location="src/httpx2/httpx2/_websockets/_transport.py:54">
P2: Request scope is stored in shared mutable state (`self.scope`), creating a race where concurrent websocket requests can bind the wrong scope.</violation>

<violation number="2" location="src/httpx2/httpx2/_websockets/_transport.py:261">
P2: Subprotocol header parsing doesn't strip whitespace, so multi-value `Sec-WebSocket-Protocol` entries are passed with leading spaces.</violation>

<violation number="3" location="src/httpx2/httpx2/_websockets/_transport.py:266">
P2: WebSocket ASGI scope sets `raw_path` to path+query instead of path-only, violating ASGI scope semantics.</violation>
</file>

Reply with feedback, questions, or to request a fix.

Re-trigger cubic

return ping_id, event

def ack(self, ping_id: bytes | bytearray | memoryview) -> None:
event = self._pings.pop(bytes(ping_id))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: ack() can raise KeyError on unmatched PONG payloads, which can crash the background receive loop.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_ping.py, line 20:

<comment>`ack()` can raise `KeyError` on unmatched PONG payloads, which can crash the background receive loop.</comment>

<file context>
@@ -0,0 +1,36 @@
+        return ping_id, event
+
+    def ack(self, ping_id: bytes | bytearray | memoryview) -> None:
+        event = self._pings.pop(bytes(ping_id))
+        event.set()
+
</file context>


@contextlib.asynccontextmanager
async def __asynccontextmanager__(self) -> AsyncGenerator[AsyncWebSocketSession]:
self._send_event, self._receive_event = anyio.create_memory_object_stream[str | bytes | WebSocketException]()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Async WebSocket sessions ignore queue_size, creating a zero-buffer channel that can block the background receiver and violate documented buffering behavior.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_session.py, line 480:

<comment>Async WebSocket sessions ignore `queue_size`, creating a zero-buffer channel that can block the background receiver and violate documented buffering behavior.</comment>

<file context>
@@ -0,0 +1,826 @@
+
+    @contextlib.asynccontextmanager
+    async def __asynccontextmanager__(self) -> AsyncGenerator[AsyncWebSocketSession]:
+        self._send_event, self._receive_event = anyio.create_memory_object_stream[str | bytes | WebSocketException]()
+        self._background_task_group = anyio.create_task_group()
+
</file context>

if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
subprotocols: list[str] = []
if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
subprotocols = subprotocols_header.split(",")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Subprotocol header parsing doesn't strip whitespace, so multi-value Sec-WebSocket-Protocol entries are passed with leading spaces.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 261:

<comment>Subprotocol header parsing doesn't strip whitespace, so multi-value `Sec-WebSocket-Protocol` entries are passed with leading spaces.</comment>

<file context>
@@ -0,0 +1,308 @@
+        if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
+            subprotocols: list[str] = []
+            if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
+                subprotocols = subprotocols_header.split(",")
+
+            scope: Scope = {
</file context>
Suggested change
subprotocols = subprotocols_header.split(",")
subprotocols = [item.strip() for item in subprotocols_header.split(",") if item.strip()]

scope: Scope = {
"type": "websocket",
"path": request.url.path,
"raw_path": request.url.raw_path,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: WebSocket ASGI scope sets raw_path to path+query instead of path-only, violating ASGI scope semantics.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 266:

<comment>WebSocket ASGI scope sets `raw_path` to path+query instead of path-only, violating ASGI scope semantics.</comment>

<file context>
@@ -0,0 +1,308 @@
+            scope: Scope = {
+                "type": "websocket",
+                "path": request.url.path,
+                "raw_path": request.url.raw_path,
+                "root_path": self.root_path,
+                "scheme": {"http": "ws", "https": "wss"}.get(scheme, scheme),
</file context>
Suggested change
"raw_path": request.url.raw_path,
"raw_path": request.url.raw_path.split(b"?", 1)[0],

initial_receive_timeout: float = 1.0,
) -> None:
self.app = app
self.scope = scope

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Request scope is stored in shared mutable state (self.scope), creating a race where concurrent websocket requests can bind the wrong scope.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 54:

<comment>Request scope is stored in shared mutable state (`self.scope`), creating a race where concurrent websocket requests can bind the wrong scope.</comment>

<file context>
@@ -0,0 +1,308 @@
+        initial_receive_timeout: float = 1.0,
+    ) -> None:
+        self.app = app
+        self.scope = scope
+        self._receive_queue = anyio.streams.stapled.StapledObjectStream(
+            *anyio.create_memory_object_stream[Message](max_buffer_size=math.inf)
</file context>

continue
message = self._assembler.feed(frame)
if message is not None:
self._events.put(message)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: self._events.put(message) blocks indefinitely when the queue is full. If the caller exits the context manager without draining all messages, __exit__ calls close() then join(), but closing the stream does not unblock a thread stuck in put(). This can cause the context manager to hang. Consider using put with a timeout in a loop that checks _should_close, or use queue.Queue.shutdown() (Python 3.13+) / put_nowait with overflow handling.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_session.py, line 353:

<comment>`self._events.put(message)` blocks indefinitely when the queue is full. If the caller exits the context manager without draining all messages, `__exit__` calls `close()` then `join()`, but closing the stream does not unblock a thread stuck in `put()`. This can cause the context manager to hang. Consider using `put` with a timeout in a loop that checks `_should_close`, or use `queue.Queue.shutdown()` (Python 3.13+) / `put_nowait` with overflow handling.</comment>

<file context>
@@ -0,0 +1,826 @@
+                        continue
+                    message = self._assembler.feed(frame)
+                    if message is not None:
+                        self._events.put(message)
+        except (httpcore2.ReadError, httpcore2.WriteError, EndOfStream):
+            self.close(INTERNAL_ERROR, "Stream error")
</file context>

The _websockets modules imported httpcore2 eagerly, defeating httpx2's lazy
loading of httpcore2; import it inside the methods that need its exceptions
and drop the AsyncNetworkStream base class from the ASGI stream.

The websockets Protocol is not thread-safe: the sync session's background
thread called receive_data() outside the write lock, racing send_close()
in close() and tripping an assertion inside the protocol. All protocol
access now happens under the write lock.

Also add websockets to the dev dependency group, avoid attribute traversal
when patching in test_top_level_websocket (test_httpcore_lazy_loading
re-imports httpx2, leaving the fresh module without submodule attributes),
and make the thread-leak test assert that session threads terminate instead
of comparing exact thread counts.
@Kludex Kludex deployed to cloudflare June 12, 2026 11:29 — with GitHub Actions Active
@Graeme22

Copy link
Copy Markdown

Excited for this, missing websockets is what's keeping me from being able to migrate to httpx2 at the moment. What's the rationale for using websockets' sans I/O over wsproto's?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

WebSockets and connection upgrades.

2 participants