From fc57d3c1a66a4feffe9dc3fd4da10ee8bc224a0d Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 3 May 2026 17:42:40 -0700 Subject: [PATCH] feat(rivetkit): support gateway bypass client requests --- .../rivetkit/src/client/actor-common.ts | 12 ++++ .../rivetkit/src/client/actor-handle.ts | 22 +++++-- .../packages/rivetkit/src/client/raw-utils.ts | 13 ++++- .../src/common/actor-router-consts.ts | 4 ++ .../src/engine-client/actor-http-client.ts | 23 +++++++- .../engine-client/actor-websocket-client.ts | 25 +++++++- .../rivetkit/src/engine-client/driver.ts | 11 +++- .../rivetkit/src/engine-client/mod.ts | 58 ++++++++++++++++--- .../rivetkit/tests/actor-gateway-url.test.ts | 23 ++++++++ 9 files changed, 171 insertions(+), 20 deletions(-) diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts index 2cc2247d70..e1c42dab61 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts @@ -31,6 +31,18 @@ export type ActorActionFunction< ...args: Args extends [unknown, ...infer Rest] ? Rest : Args ) => Promise; +export interface ActorGatewayOptions { + bypassConnectable?: boolean; +} + +export interface ActorFetchInit extends RequestInit { + gateway?: ActorGatewayOptions; +} + +export interface ActorWebSocketOptions { + gateway?: ActorGatewayOptions; +} + /** * Maps action methods from actor definition to typed function signatures. */ diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts index 905a6461c0..33c1c30ea2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts @@ -26,7 +26,9 @@ import { decodeCborCompat, deserializeWithEncoding, encodeCborCompat } from "@/s import { bufferToArrayBuffer } from "@/utils"; import type { ActorDefinitionActions, + ActorFetchInit, ActorDefinitionQueueSend, + ActorWebSocketOptions, } from "./actor-common"; import { type ActorConn, ActorConnRaw } from "./actor-conn"; import { @@ -575,16 +577,17 @@ export class ActorHandleRaw { * Fetches a resource from this actor via the /request endpoint. This is a * convenience wrapper around the raw HTTP API. */ - fetch(input: string | URL | Request, init?: RequestInit) { + fetch(input: string | URL | Request, init?: ActorFetchInit) { return this.#fetchWithResolvedActor(input, init); } async #fetchWithResolvedActor( input: string | URL | Request, - init?: RequestInit, + init?: ActorFetchInit, ) { const maxAttempts = this.#getDynamicQueryMaxAttempts(); let useQueryTarget = false; + const { gateway, ...requestInit } = init ?? {}; for (let attempt = 0; attempt < maxAttempts; attempt++) { let actorId: string | undefined; @@ -596,7 +599,8 @@ export class ActorHandleRaw { target, this.#params, input, - init, + requestInit, + gateway, ); const retry = await this.#shouldRetryRawFetchResponse( response, @@ -783,14 +787,22 @@ export class ActorHandleRaw { /** * Opens a raw WebSocket connection to this actor. */ - async webSocket(path?: string, protocols?: string | string[]) { + async webSocket( + path?: string, + protocols?: string | string[], + options: ActorWebSocketOptions = {}, + ) { const params = await this.#resolveConnectionParams(); + const target = options.gateway?.bypassConnectable + ? await this.#resolveActionTarget(false) + : getGatewayTarget(this.#actorResolutionState); return await rawWebSocket( this.#driver, - getGatewayTarget(this.#actorResolutionState), + target, params, path, protocols, + options.gateway, ); } diff --git a/rivetkit-typescript/packages/rivetkit/src/client/raw-utils.ts b/rivetkit-typescript/packages/rivetkit/src/client/raw-utils.ts index e04d060aa3..7896a5bb02 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/raw-utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/raw-utils.ts @@ -3,6 +3,7 @@ import { deconstructError } from "@/common/utils"; import { type GatewayTarget, type EngineControlClient, + type GatewayRequestOptions, } from "@/engine-client/driver"; import { HEADER_CONN_PARAMS } from "@/common/actor-router-consts"; import { ActorError } from "./errors"; @@ -17,6 +18,7 @@ export async function rawHttpFetch( params: unknown, input: string | URL | Request, init?: RequestInit, + options: GatewayRequestOptions = {}, ): Promise { // Extract path and merge init options let path: string; @@ -91,7 +93,7 @@ export async function rawHttpFetch( headers: proxyRequestHeaders, }); - return driver.sendRequest(target, proxyRequest); + return driver.sendRequest(target, proxyRequest, options); } catch (err) { // Standardize to ClientActorError instead of the native backend error const { group, code, message, metadata } = deconstructError( @@ -114,6 +116,7 @@ export async function rawWebSocket( path?: string, // TODO: Supportp rotocols _protocols?: string | string[], + options: GatewayRequestOptions = {}, ): Promise { // TODO: Do we need encoding in rawWebSocket? const encoding = "bare"; @@ -145,7 +148,13 @@ export async function rawWebSocket( }); // Open WebSocket - const ws = await driver.openWebSocket(fullPath, target, encoding, params); + const ws = await driver.openWebSocket( + fullPath, + target, + encoding, + params, + options, + ); // Node & browser WebSocket types are incompatible return ws as any; diff --git a/rivetkit-typescript/packages/rivetkit/src/common/actor-router-consts.ts b/rivetkit-typescript/packages/rivetkit/src/common/actor-router-consts.ts index 91ef1a6cd3..2194a2fecd 100644 --- a/rivetkit-typescript/packages/rivetkit/src/common/actor-router-consts.ts +++ b/rivetkit-typescript/packages/rivetkit/src/common/actor-router-consts.ts @@ -21,6 +21,8 @@ export const HEADER_RIVET_TOKEN = "x-rivet-token"; export const HEADER_RIVET_TARGET = "x-rivet-target"; export const HEADER_RIVET_ACTOR = "x-rivet-actor"; export const HEADER_RIVET_NAMESPACE = "x-rivet-namespace"; +export const HEADER_RIVET_BYPASS_CONNECTABLE = + "x-rivet-bypass-connectable"; // MARK: WebSocket Protocol Prefixes /** Some servers (such as node-ws & Cloudflare) require explicitly match a certain WebSocket protocol. This gives us a static protocol to match against. */ @@ -30,6 +32,7 @@ export const WS_PROTOCOL_ACTOR = "rivet_actor."; export const WS_PROTOCOL_ENCODING = "rivet_encoding."; export const WS_PROTOCOL_CONN_PARAMS = "rivet_conn_params."; export const WS_PROTOCOL_TOKEN = "rivet_token."; +export const WS_PROTOCOL_BYPASS_CONNECTABLE = "rivet_bypass_connectable"; export const WS_PROTOCOL_TEST_ACK_HOOK = "rivet_test_ack_hook."; // MARK: WebSocket Inline Test Protocol Prefixes @@ -51,4 +54,5 @@ export const ALLOWED_PUBLIC_HEADERS = [ HEADER_RIVET_ACTOR, HEADER_RIVET_NAMESPACE, HEADER_RIVET_TOKEN, + HEADER_RIVET_BYPASS_CONNECTABLE, ]; diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-http-client.ts b/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-http-client.ts index 06363a9d5d..4220e25e0c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-http-client.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-http-client.ts @@ -1,14 +1,25 @@ import type { ClientConfig } from "@/client/config"; -import { HEADER_RIVET_TOKEN } from "@/common/actor-router-consts"; +import { + HEADER_RIVET_ACTOR, + HEADER_RIVET_BYPASS_CONNECTABLE, + HEADER_RIVET_TARGET, + HEADER_RIVET_TOKEN, +} from "@/common/actor-router-consts"; +import type { GatewayRequestOptions } from "./driver"; + +export interface HttpGatewayRequestOptions extends GatewayRequestOptions { + directActorId?: string; +} export async function sendHttpRequestToGateway( runConfig: ClientConfig, gatewayUrl: string, actorRequest: Request, + options: HttpGatewayRequestOptions = {}, ): Promise { // Handle body properly based on method and presence let bodyToSend: ArrayBuffer | null = null; - const guardHeaders = buildGuardHeaders(runConfig, actorRequest); + const guardHeaders = buildGuardHeaders(runConfig, actorRequest, options); if (actorRequest.method !== "GET" && actorRequest.method !== "HEAD") { if (actorRequest.bodyUsed) { @@ -49,6 +60,7 @@ function mutableResponse(fetchRes: Response): Response { function buildGuardHeaders( runConfig: ClientConfig, actorRequest: Request, + options: HttpGatewayRequestOptions, ): Headers { const headers = new Headers(); // Copy all headers from the original request @@ -63,5 +75,12 @@ function buildGuardHeaders( if (runConfig.token) { headers.set(HEADER_RIVET_TOKEN, runConfig.token); } + if (options.directActorId !== undefined) { + headers.set(HEADER_RIVET_TARGET, "actor"); + headers.set(HEADER_RIVET_ACTOR, options.directActorId); + } + if (options.bypassConnectable) { + headers.set(HEADER_RIVET_BYPASS_CONNECTABLE, "1"); + } return headers; } diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-websocket-client.ts b/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-websocket-client.ts index 4f37807f9a..d4ffe13a0d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-websocket-client.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-websocket-client.ts @@ -8,6 +8,7 @@ import { WS_PROTOCOL_STANDARD as WS_PROTOCOL_RIVETKIT, WS_PROTOCOL_TARGET, WS_PROTOCOL_ACTOR, + WS_PROTOCOL_BYPASS_CONNECTABLE, WS_PROTOCOL_TEST_ACK_HOOK, WS_PROTOCOL_TOKEN, } from "@/common/actor-router-consts"; @@ -17,6 +18,7 @@ import type { ActorGatewayQuery, CrashPolicy } from "@/client/query"; import type { Encoding, UniversalWebSocket } from "@/mod"; import { encodeCborCompat, uint8ArrayToBase64 } from "@/serde"; import { combineUrlPath } from "@/utils"; +import type { GatewayRequestOptions } from "./driver"; import { logger } from "./log"; class BufferedRemoteWebSocket implements UniversalWebSocket { @@ -211,6 +213,7 @@ export function buildActorQueryGatewayUrl( maxInputSize = DEFAULT_MAX_QUERY_INPUT_SIZE, crashPolicy: CrashPolicy | undefined = undefined, runnerName?: string, + options: GatewayRequestOptions = {}, ): string { if (namespace.length === 0) { throw new Error("actor query namespace must not be empty"); @@ -266,6 +269,9 @@ export function buildActorQueryGatewayUrl( if (token !== undefined) { params.append("rvt-token", token); } + if (options.bypassConnectable) { + params.append("rvt-bypass_connectable", "true"); + } const queryString = params.toString(); let separator: string; @@ -318,6 +324,7 @@ export async function openWebSocketToGateway( gatewayUrl: string, encoding: Encoding, params: unknown, + options: GatewayRequestOptions & { directActorId?: string } = {}, ): Promise { const WebSocket = await importWebSocket(); @@ -334,7 +341,19 @@ export async function openWebSocketToGateway( // Create WebSocket connection const ws = new WebSocket( gatewayUrl, - buildWebSocketProtocols(runConfig, encoding, params, ackHookToken), + buildWebSocketProtocols( + runConfig, + encoding, + params, + ackHookToken, + options.directActorId + ? { + target: "actor", + actorId: options.directActorId, + } + : undefined, + options, + ), ); // The WebSocket is returned before the connection is open. This follows @@ -364,6 +383,7 @@ export function buildWebSocketProtocols( target: "actor"; actorId: string; }, + options: GatewayRequestOptions = {}, ): string[] { const protocols: string[] = []; protocols.push(WS_PROTOCOL_RIVETKIT); @@ -372,6 +392,9 @@ export function buildWebSocketProtocols( protocols.push(`${WS_PROTOCOL_TARGET}${target.target}`); protocols.push(`${WS_PROTOCOL_ACTOR}${target.actorId}`); } + if (options.bypassConnectable) { + protocols.push(WS_PROTOCOL_BYPASS_CONNECTABLE); + } if (params) { protocols.push( `${WS_PROTOCOL_CONN_PARAMS}${encodeURIComponent(JSON.stringify(params))}`, diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts b/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts index e2ed179dae..c2c65c6264 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts @@ -6,6 +6,10 @@ import type { ActorQuery, CrashPolicy } from "@/client/query"; export type GatewayTarget = { directId: string } | ActorQuery; +export interface GatewayRequestOptions { + bypassConnectable?: boolean; +} + export interface EngineControlClient { getForId(input: GetForIdInput): Promise; getWithKey(input: GetWithKeyInput): Promise; @@ -16,12 +20,14 @@ export interface EngineControlClient { sendRequest( target: GatewayTarget, actorRequest: Request, + options?: GatewayRequestOptions, ): Promise; openWebSocket( path: string, target: GatewayTarget, encoding: Encoding, params: unknown, + options?: GatewayRequestOptions, ): Promise; proxyRequest( c: HonoContext, @@ -35,7 +41,10 @@ export interface EngineControlClient { encoding: Encoding, params: unknown, ): Promise; - buildGatewayUrl(target: GatewayTarget): Promise; + buildGatewayUrl( + target: GatewayTarget, + options?: GatewayRequestOptions, + ): Promise; displayInformation(): RuntimeDisplayInformation; extraStartupLog?: () => Record; modifyRuntimeRouter?: (config: RegistryConfig, router: Hono) => void; diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-client/mod.ts b/rivetkit-typescript/packages/rivetkit/src/engine-client/mod.ts index 0cc3df41f6..a81ba9adcb 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-client/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-client/mod.ts @@ -7,6 +7,7 @@ import { type ActorOutput, type CreateInput, type GatewayTarget, + type GatewayRequestOptions, type GetForIdInput, type GetOrCreateWithKeyInput, type GetWithKeyInput, @@ -246,15 +247,25 @@ export class RemoteEngineControlClient implements EngineControlClient { async sendRequest( target: GatewayTarget, actorRequest: Request, + options: GatewayRequestOptions = {}, ): Promise { await this.#metadataPromise; - const gatewayUrl = this.#buildGatewayUrlForTarget( - target, - requestPath(actorRequest), - ); + const path = requestPath(actorRequest); + const gatewayUrl = this.#buildGatewayUrlForTarget(target, path, options); + const httpOptions = { + ...options, + directActorId: options.bypassConnectable + ? directActorIdFromTarget(target) + : undefined, + }; - return sendHttpRequestToGateway(this.#config, gatewayUrl, actorRequest); + return sendHttpRequestToGateway( + this.#config, + gatewayUrl, + actorRequest, + httpOptions, + ); } async openWebSocket( @@ -262,22 +273,32 @@ export class RemoteEngineControlClient implements EngineControlClient { target: GatewayTarget, encoding: Encoding, params: unknown, + options: GatewayRequestOptions = {}, ): Promise { await this.#metadataPromise; - const gatewayUrl = this.#buildGatewayUrlForTarget(target, path); + const gatewayUrl = this.#buildGatewayUrlForTarget(target, path, options); return openWebSocketToGateway( this.#config, gatewayUrl, encoding, params, + { + ...options, + directActorId: options.bypassConnectable + ? directActorIdFromTarget(target) + : undefined, + }, ); } - async buildGatewayUrl(target: GatewayTarget): Promise { + async buildGatewayUrl( + target: GatewayTarget, + options: GatewayRequestOptions = {}, + ): Promise { await this.#metadataPromise; - return this.#buildGatewayUrlForTarget(target, ""); + return this.#buildGatewayUrlForTarget(target, "", options); } async proxyRequest( @@ -382,9 +403,17 @@ export class RemoteEngineControlClient implements EngineControlClient { this.#config.getUpgradeWebSocket = getUpgradeWebSocket; } - #buildGatewayUrlForTarget(target: GatewayTarget, path: string): string { + #buildGatewayUrlForTarget( + target: GatewayTarget, + path: string, + options: GatewayRequestOptions = {}, + ): string { const endpoint = getEndpoint(this.#config); + if (options.bypassConnectable && directActorIdFromTarget(target)) { + return combineUrlPath(endpoint, path); + } + if ("directId" in target) { return buildActorGatewayUrl( endpoint, @@ -415,6 +444,7 @@ export class RemoteEngineControlClient implements EngineControlClient { "getOrCreateForKey" in target ? this.#config.poolName : undefined, + options, ); } @@ -428,6 +458,16 @@ export class RemoteEngineControlClient implements EngineControlClient { } } +function directActorIdFromTarget(target: GatewayTarget): string | undefined { + if ("directId" in target) { + return target.directId; + } + if ("getForId" in target) { + return target.getForId.actorId; + } + return undefined; +} + function requestPath(req: Request): string { const url = new URL(req.url); return `${url.pathname}${url.search}`; diff --git a/rivetkit-typescript/packages/rivetkit/tests/actor-gateway-url.test.ts b/rivetkit-typescript/packages/rivetkit/tests/actor-gateway-url.test.ts index f254d14f48..e198e3b3fb 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/actor-gateway-url.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/actor-gateway-url.test.ts @@ -56,6 +56,29 @@ describe("gateway URL builders", () => { expect(url).not.toContain("@"); }); + test("serializes gateway bypass for query routing", () => { + const url = buildActorQueryGatewayUrl( + "https://api.rivet.dev/manager", + "prod", + { + getForKey: { + name: "room", + key: ["alpha"], + }, + }, + undefined, + "/status", + undefined, + undefined, + undefined, + { bypassConnectable: true }, + ); + + expect(new URL(url).searchParams.get("rvt-bypass_connectable")).toBe( + "true", + ); + }); + test("serializes getOrCreate queries with rvt-* params", () => { const input = { hello: "world" }; const url = buildActorQueryGatewayUrl(