Add WebSocket support#1034
Conversation
… AsyncClient.websocket() Vendor httpx-ws (MIT, by @frankie567) into an isolated httpx2/_websockets package, exposing WebSocketSession, AsyncWebSocketSession, ASGIWebSocketTransport and the WebSocket exception hierarchy from the top-level httpx2 namespace.
Replace the wsproto state machine with the websockets sans-IO Protocol in both the sessions and the ASGI transport. The raw event-based send()/receive() API is replaced by a message-level API: receive() now returns str | bytes, keeping the sans-IO library out of the public surface. Also set ws="none" on the HTTP test server: with websockets installed, uvicorn's ws="auto" selects the deprecated websockets.legacy implementation, whose import-time DeprecationWarning is an error under filterwarnings=error and silently kills the server thread.
|
Docs preview: |
Merging this PR will not alter performance
Comparing Footnotes
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d37fd914d9
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if response.status_code != 101: | ||
| raise WebSocketUpgradeError(response) |
There was a problem hiding this comment.
Validate the WebSocket accept key before opening
When a peer returns HTTP 101 with a missing or incorrect Sec-WebSocket-Accept header (for example, a misconfigured proxy or non-WebSocket endpoint), this accepts the upgrade solely on the status code and starts a session over a stream that never completed the WebSocket handshake. The client generates a key for the request, so connect_ws/aconnect_ws should verify the response Upgrade/Connection headers and accept key before yielding the session.
Useful? React with 👍 / 👎.
| message = self._assembler.feed(frame) | ||
| if message is not None: | ||
| self._events.put(message) |
There was a problem hiding this comment.
Avoid blocking forever when the sync queue is full
If the server sends more than queue_size messages and the caller exits the context without draining them, the background thread can block forever in this put(). __exit__ then calls close() and immediately join(), but closing the stream does not unblock a thread already stuck on the full Python queue, so the context manager can hang indefinitely for bursty servers or small queue sizes.
Useful? React with 👍 / 👎.
| @contextlib.asynccontextmanager | ||
| async def __asynccontextmanager__(self) -> AsyncGenerator[AsyncWebSocketSession]: | ||
| self._send_event, self._receive_event = anyio.create_memory_object_stream[str | bytes | WebSocketException]() |
There was a problem hiding this comment.
Honor queue_size for async WebSocket sessions
anyio.create_memory_object_stream() defaults to an unbuffered stream, so the async path ignores the requested queue_size and effectively has no receive queue. With AsyncClient.websocket(...), a server that sends messages while user code is not already awaiting receive() will cause the background receive task to stop reading immediately, contrary to the documented/default 512-message buffer and potentially causing avoidable backpressure or keepalive failures.
Useful? React with 👍 / 👎.
| subprotocols: list[str] = [] | ||
| if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None: | ||
| subprotocols = subprotocols_header.split(",") |
There was a problem hiding this comment.
Strip spaces when parsing offered subprotocols
For ASGI tests/apps using multiple subprotocols, the client sends a comma+space separated header, but this split leaves leading spaces on every value after the first. An app checking scope["subprotocols"] for the exact second protocol (for example "v2" from subprotocols=["v1", "v2"]) will see " v2" instead and fail negotiation even though the client offered it.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
6 issues found across 16 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/httpx2/httpx2/_websockets/_ping.py">
<violation number="1" location="src/httpx2/httpx2/_websockets/_ping.py:20">
P1: `ack()` can raise `KeyError` on unmatched PONG payloads, which can crash the background receive loop.</violation>
</file>
<file name="src/httpx2/httpx2/_websockets/_session.py">
<violation number="1" location="src/httpx2/httpx2/_websockets/_session.py:353">
P2: `self._events.put(message)` blocks indefinitely when the queue is full. If the caller exits the context manager without draining all messages, `__exit__` calls `close()` then `join()`, but closing the stream does not unblock a thread stuck in `put()`. This can cause the context manager to hang. Consider using `put` with a timeout in a loop that checks `_should_close`, or use `queue.Queue.shutdown()` (Python 3.13+) / `put_nowait` with overflow handling.</violation>
<violation number="2" location="src/httpx2/httpx2/_websockets/_session.py:480">
P2: Async WebSocket sessions ignore `queue_size`, creating a zero-buffer channel that can block the background receiver and violate documented buffering behavior.</violation>
</file>
<file name="src/httpx2/httpx2/_websockets/_transport.py">
<violation number="1" location="src/httpx2/httpx2/_websockets/_transport.py:54">
P2: Request scope is stored in shared mutable state (`self.scope`), creating a race where concurrent websocket requests can bind the wrong scope.</violation>
<violation number="2" location="src/httpx2/httpx2/_websockets/_transport.py:261">
P2: Subprotocol header parsing doesn't strip whitespace, so multi-value `Sec-WebSocket-Protocol` entries are passed with leading spaces.</violation>
<violation number="3" location="src/httpx2/httpx2/_websockets/_transport.py:266">
P2: WebSocket ASGI scope sets `raw_path` to path+query instead of path-only, violating ASGI scope semantics.</violation>
</file>
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
| return ping_id, event | ||
|
|
||
| def ack(self, ping_id: bytes | bytearray | memoryview) -> None: | ||
| event = self._pings.pop(bytes(ping_id)) |
There was a problem hiding this comment.
P1: ack() can raise KeyError on unmatched PONG payloads, which can crash the background receive loop.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_ping.py, line 20:
<comment>`ack()` can raise `KeyError` on unmatched PONG payloads, which can crash the background receive loop.</comment>
<file context>
@@ -0,0 +1,36 @@
+ return ping_id, event
+
+ def ack(self, ping_id: bytes | bytearray | memoryview) -> None:
+ event = self._pings.pop(bytes(ping_id))
+ event.set()
+
</file context>
|
|
||
| @contextlib.asynccontextmanager | ||
| async def __asynccontextmanager__(self) -> AsyncGenerator[AsyncWebSocketSession]: | ||
| self._send_event, self._receive_event = anyio.create_memory_object_stream[str | bytes | WebSocketException]() |
There was a problem hiding this comment.
P2: Async WebSocket sessions ignore queue_size, creating a zero-buffer channel that can block the background receiver and violate documented buffering behavior.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_session.py, line 480:
<comment>Async WebSocket sessions ignore `queue_size`, creating a zero-buffer channel that can block the background receiver and violate documented buffering behavior.</comment>
<file context>
@@ -0,0 +1,826 @@
+
+ @contextlib.asynccontextmanager
+ async def __asynccontextmanager__(self) -> AsyncGenerator[AsyncWebSocketSession]:
+ self._send_event, self._receive_event = anyio.create_memory_object_stream[str | bytes | WebSocketException]()
+ self._background_task_group = anyio.create_task_group()
+
</file context>
| if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket": | ||
| subprotocols: list[str] = [] | ||
| if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None: | ||
| subprotocols = subprotocols_header.split(",") |
There was a problem hiding this comment.
P2: Subprotocol header parsing doesn't strip whitespace, so multi-value Sec-WebSocket-Protocol entries are passed with leading spaces.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 261:
<comment>Subprotocol header parsing doesn't strip whitespace, so multi-value `Sec-WebSocket-Protocol` entries are passed with leading spaces.</comment>
<file context>
@@ -0,0 +1,308 @@
+ if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
+ subprotocols: list[str] = []
+ if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
+ subprotocols = subprotocols_header.split(",")
+
+ scope: Scope = {
</file context>
| subprotocols = subprotocols_header.split(",") | |
| subprotocols = [item.strip() for item in subprotocols_header.split(",") if item.strip()] |
| scope: Scope = { | ||
| "type": "websocket", | ||
| "path": request.url.path, | ||
| "raw_path": request.url.raw_path, |
There was a problem hiding this comment.
P2: WebSocket ASGI scope sets raw_path to path+query instead of path-only, violating ASGI scope semantics.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 266:
<comment>WebSocket ASGI scope sets `raw_path` to path+query instead of path-only, violating ASGI scope semantics.</comment>
<file context>
@@ -0,0 +1,308 @@
+ scope: Scope = {
+ "type": "websocket",
+ "path": request.url.path,
+ "raw_path": request.url.raw_path,
+ "root_path": self.root_path,
+ "scheme": {"http": "ws", "https": "wss"}.get(scheme, scheme),
</file context>
| "raw_path": request.url.raw_path, | |
| "raw_path": request.url.raw_path.split(b"?", 1)[0], |
| initial_receive_timeout: float = 1.0, | ||
| ) -> None: | ||
| self.app = app | ||
| self.scope = scope |
There was a problem hiding this comment.
P2: Request scope is stored in shared mutable state (self.scope), creating a race where concurrent websocket requests can bind the wrong scope.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 54:
<comment>Request scope is stored in shared mutable state (`self.scope`), creating a race where concurrent websocket requests can bind the wrong scope.</comment>
<file context>
@@ -0,0 +1,308 @@
+ initial_receive_timeout: float = 1.0,
+ ) -> None:
+ self.app = app
+ self.scope = scope
+ self._receive_queue = anyio.streams.stapled.StapledObjectStream(
+ *anyio.create_memory_object_stream[Message](max_buffer_size=math.inf)
</file context>
| continue | ||
| message = self._assembler.feed(frame) | ||
| if message is not None: | ||
| self._events.put(message) |
There was a problem hiding this comment.
P2: self._events.put(message) blocks indefinitely when the queue is full. If the caller exits the context manager without draining all messages, __exit__ calls close() then join(), but closing the stream does not unblock a thread stuck in put(). This can cause the context manager to hang. Consider using put with a timeout in a loop that checks _should_close, or use queue.Queue.shutdown() (Python 3.13+) / put_nowait with overflow handling.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_session.py, line 353:
<comment>`self._events.put(message)` blocks indefinitely when the queue is full. If the caller exits the context manager without draining all messages, `__exit__` calls `close()` then `join()`, but closing the stream does not unblock a thread stuck in `put()`. This can cause the context manager to hang. Consider using `put` with a timeout in a loop that checks `_should_close`, or use `queue.Queue.shutdown()` (Python 3.13+) / `put_nowait` with overflow handling.</comment>
<file context>
@@ -0,0 +1,826 @@
+ continue
+ message = self._assembler.feed(frame)
+ if message is not None:
+ self._events.put(message)
+ except (httpcore2.ReadError, httpcore2.WriteError, EndOfStream):
+ self.close(INTERNAL_ERROR, "Stream error")
</file context>
The _websockets modules imported httpcore2 eagerly, defeating httpx2's lazy loading of httpcore2; import it inside the methods that need its exceptions and drop the AsyncNetworkStream base class from the ASGI stream. The websockets Protocol is not thread-safe: the sync session's background thread called receive_data() outside the write lock, racing send_close() in close() and tripping an assertion inside the protocol. All protocol access now happens under the write lock. Also add websockets to the dev dependency group, avoid attribute traversal when patching in test_top_level_websocket (test_httpcore_lazy_loading re-imports httpx2, leaving the fresh module without submodule attributes), and make the thread-leak test assert that session threads terminate instead of comparing exact thread counts.
|
Excited for this, missing websockets is what's keeping me from being able to migrate to httpx2 at the moment. What's the rationale for using websockets' sans I/O over wsproto's? |
Closes #105.
Adds native WebSocket support to httpx2, based on httpx-ws by @frankie567 (MIT), vendored into an isolated
httpx2/_websocketspackage and then migrated fromwsprototo thewebsocketssans-IO protocol.API
ws:///wss://URLs are accepted and mapped tohttp(s)for the transport.send_text/send_bytes/send_json,receive/receive_text/receive_bytes/receive_json,pingandclose, with a background keepalive ping.receive()returnsstr | bytes- the sans-IO library doesn't leak into the public API.ASGIWebSocketTransportallows testing WebSocket endpoints of ASGI apps without a server.WebSocketException(base),WebSocketDisconnect,WebSocketUpgradeError,WebSocketNetworkError,WebSocketInvalidTypeReceived.websockets>=15becomes a dependency of httpx2.Notes
AsyncClient.websocket()manages background tasks with an anyio task group, so exceptions escaping theasync withblock arrive wrapped in anExceptionGroup(useexcept*).ws="auto": withwebsocketsinstalled it selects the deprecatedwebsockets.legacyimplementation, whose import-timeDeprecationWarningis an error underfilterwarnings = ["error"]and silently kills the server thread.AI Disclaimer
This PR was developed with the assistance of either Claude or Codex. I've reviewed and verified the changes.