diff --git a/src/server.c b/src/server.c index 9892079..c3d7b01 100644 --- a/src/server.c +++ b/src/server.c @@ -154,6 +154,27 @@ void address_pool_remove(address_pool* pool, int64_t i) */ static void uv_thread(uv_async_t* handle); +/** + * @brief Initializes a fd injection queue. + */ +static void fd_queue_init(vws_fd_queue* q); + +/** + * @brief Destroys a fd injection queue, closing any pending fds. + */ +static void fd_queue_destroy(vws_fd_queue* q); + +/** + * @brief UV async callback that drains the server's fd injection queue. + * + * Runs on the network thread when vws_tcp_svr_inject_fd() signals the + * server->fd_inject_async handle. Pops every queued fd and adopts it into + * the loop as a uv_pipe_t. + * + * @param handle The async handle that triggered the callback. + */ +static void on_fd_inject(uv_async_t* handle); + /** * @brief The entry point for a worker thread. * @@ -1710,6 +1731,20 @@ vws_tcp_svr* tcp_svr_ctor(vws_tcp_svr* svr, int nt, int backlog, int queue_size) svr->peer_timer = vws.malloc(sizeof(uv_timer_t)); uv_timer_init(svr->loop, svr->peer_timer); + // FD injection queue and async wakeup. The async->data carries a cinfo + // with an invalid cid so svr_on_close() can free it on shutdown without + // touching connection state. + fd_queue_init(&svr->fd_queue); + + vws_cinfo* fd_ci = vws.malloc(sizeof(vws_cinfo)); + fd_ci->cnx = NULL; + fd_ci->server = svr; + fd_ci->cid.key = 0; + + svr->fd_inject_async = vws.malloc(sizeof(uv_async_t)); + svr->fd_inject_async->data = fd_ci; + uv_async_init(svr->loop, svr->fd_inject_async, on_fd_inject); + return svr; } @@ -1749,6 +1784,14 @@ void tcp_svr_dtor(vws_tcp_svr* svr) // Close the server async handle uv_close((uv_handle_t*)svr->wakeup, svr_on_close); + // Close the fd injection async handle. svr_on_close() will free the + // attached cinfo and the handle itself. + uv_close((uv_handle_t*)svr->fd_inject_async, svr_on_close); + + // Drain and destroy the fd injection queue. Any fds still pending have + // not been adopted by the loop and are closed here. + fd_queue_destroy(&svr->fd_queue); + // Close peer timer svr->peer_timer->data = NULL; uv_close((uv_handle_t*)svr->peer_timer, svr_on_timer_close); @@ -2297,6 +2340,208 @@ bool queue_empty(vws_svr_queue* queue) return empty; } +//------------------------------------------------------------------------------ +// FD Injection Queue +//------------------------------------------------------------------------------ + +static void fd_queue_init(vws_fd_queue* q) +{ + q->head = NULL; + q->tail = NULL; + uv_mutex_init(&q->mutex); +} + +static void fd_queue_destroy(vws_fd_queue* q) +{ + uv_mutex_lock(&q->mutex); + + vws_fd_node* n = q->head; + while (n != NULL) + { + vws_fd_node* next = n->next; + + // Close any fd that never got adopted into the loop. + if (n->fd >= 0) + { + close(n->fd); + } + + vws.free(n); + n = next; + } + q->head = NULL; + q->tail = NULL; + + uv_mutex_unlock(&q->mutex); + uv_mutex_destroy(&q->mutex); +} + +static void fd_queue_push(vws_fd_queue* q, int fd) +{ + vws_fd_node* n = vws.malloc(sizeof(vws_fd_node)); + n->fd = fd; + n->next = NULL; + + uv_mutex_lock(&q->mutex); + + if (q->tail == NULL) + { + q->head = n; + q->tail = n; + } + else + { + q->tail->next = n; + q->tail = n; + } + + uv_mutex_unlock(&q->mutex); +} + +// Detach the entire chain so the loop thread can drain it without holding +// the mutex. +static vws_fd_node* fd_queue_take_all(vws_fd_queue* q) +{ + uv_mutex_lock(&q->mutex); + vws_fd_node* head = q->head; + q->head = NULL; + q->tail = NULL; + uv_mutex_unlock(&q->mutex); + + return head; +} + +// Adopt an injected fd into the libuv loop as a uv_pipe_t and run it through +// the standard connection setup. Must be called on the loop thread. +static void adopt_injected_fd(vws_tcp_svr* server, int fd) +{ + uv_pipe_t* p = (uv_pipe_t*)vws.malloc(sizeof(uv_pipe_t)); + + if (uv_pipe_init(server->loop, p, 0) != 0) + { + vws.error(VE_RT, "uv_pipe_init failed"); + vws.free(p); + close(fd); + return; + } + + if (uv_pipe_open(p, fd) != 0) + { + vws.error(VE_RT, "uv_pipe_open failed"); + vws.free(p); + close(fd); + return; + } + + vws_cinfo* ci = vws.malloc(sizeof(vws_cinfo)); + ci->cnx = NULL; + ci->server = server; + p->data = ci; + + if (uv_read_start((uv_stream_t*)p, svr_on_realloc, svr_on_read) != 0) + { + vws.error(VE_RT, "uv_read_start failed on injected fd"); + vws.free(ci); + uv_close((uv_handle_t*)p, NULL); + return; + } + + vws_svr_cnx* cnx = svr_cnx_new(server, (uv_stream_t*)p); + ci->cnx = cnx; + ci->cid = cnx->cid; + + server->on_connect(cnx); +} + +static void on_fd_inject(uv_async_t* handle) +{ + vws_cinfo* ci = (vws_cinfo*)handle->data; + vws_tcp_svr* server = ci->server; + + if (server->state != VS_RUNNING) + { + return; + } + + vws_fd_node* n = fd_queue_take_all(&server->fd_queue); + while (n != NULL) + { + vws_fd_node* next = n->next; + adopt_injected_fd(server, n->fd); + vws.free(n); + n = next; + } +} + +int vws_tcp_svr_inject_fd(vws_tcp_svr* server, int fd) +{ + if (server == NULL || fd < 0) + { + return -1; + } + + if (server->state != VS_RUNNING) + { + vws.error(VE_RT, "server not running"); + return -1; + } + + fd_queue_push(&server->fd_queue, fd); + uv_async_send(server->fd_inject_async); + + return 0; +} + +vws_cnx* vws_pipe_connect(vws_tcp_svr* server) +{ + if (server == NULL) + { + vws.error(VE_RT, "vws_pipe_connect: NULL server"); + return NULL; + } + + uv_os_sock_t fds[2]; + int rc = uv_socketpair( SOCK_STREAM, + 0, + fds, + UV_NONBLOCK_PIPE, + UV_NONBLOCK_PIPE ); + + if (rc < 0) + { + vws.error(VE_RT, "uv_socketpair() failed: %s", uv_strerror(rc)); + return NULL; + } + + // Hand fds[0] to the server. After this call the server owns it. + if (vws_tcp_svr_inject_fd(server, (int)fds[0]) != 0) + { + close((int)fds[0]); + close((int)fds[1]); + return NULL; + } + + // Wrap fds[1] as a client and run the websocket handshake. This blocks + // until the server-side has processed the upgrade and replied, which + // requires the server's loop to be running on a different thread. + vws_cnx* cnx = vws_cnx_new(); + + // Frame masking exists to defeat cache-poisoning attacks on + // intermediate proxies. There are no proxies on a socketpair, so + // skip the per-frame XOR and key generation. + vws_cnx_set_server_mode(cnx); + + if (vws_cnx_from_fd(cnx, (int)fds[1]) == false) + { + // Handshake failed. The fd is owned by the cnx at this point so + // vws_cnx_free will close it. + vws_cnx_free(cnx); + return NULL; + } + + return cnx; +} + //------------------------------------------------------------------------------ // Pure WebSocket Server //------------------------------------------------------------------------------ diff --git a/src/server.h b/src/server.h index 0b59255..0f8b51e 100644 --- a/src/server.h +++ b/src/server.h @@ -320,6 +320,42 @@ typedef struct } vws_svr_queue; +/** + * @struct vws_fd_node + * @brief Linked-list node holding a single file descriptor pending injection + * into a running server's libuv loop. + */ +typedef struct vws_fd_node +{ + /**< File descriptor to be adopted by the loop */ + int fd; + + /**< Next node in the queue */ + struct vws_fd_node* next; + +} vws_fd_node; + +/** + * @struct vws_fd_queue + * @brief Thread-safe FIFO of file descriptors awaiting injection. + * + * Producers (any thread) push fds via vws_tcp_svr_inject_fd(). The libuv + * network thread drains the queue in response to a uv_async_t signal and + * adopts each fd into the loop as a uv_pipe_t. + */ +typedef struct vws_fd_queue +{ + /**< Head of the queue (oldest entry) */ + vws_fd_node* head; + + /**< Tail of the queue (newest entry) */ + vws_fd_node* tail; + + /**< Mutex protecting head/tail */ + uv_mutex_t mutex; + +} vws_fd_queue; + struct vws_tcp_svr; /** @@ -557,6 +593,13 @@ typedef struct vws_tcp_svr /**< The peer timer */ uv_timer_t* peer_timer; + + /**< Queue of file descriptors awaiting injection into the loop */ + vws_fd_queue fd_queue; + + /**< Async handle that wakes the loop to drain fd_queue */ + uv_async_t* fd_inject_async; + } vws_tcp_svr; /** @@ -650,6 +693,42 @@ int vws_tcp_svr_inetd_run(vws_tcp_svr* server, int sockfd); */ void vws_tcp_svr_inetd_stop(vws_tcp_svr* server); +/** + * @brief Injects an already-connected file descriptor into a running server. + * + * This pushes the fd onto the server's injection queue and signals the libuv + * loop. The network thread will drain the queue, adopt the fd into the loop + * as a uv_pipe_t, and run it through the normal connection setup so that it + * is treated like any other client. Suitable for any stream-oriented fd + * (sockets from uv_socketpair(), Unix domain sockets, anonymous pipes, etc.). + * + * Safe to call from any thread. The server must be in VS_RUNNING state with + * its loop running on a different thread than the caller. + * + * @param server The server. + * @param fd A connected, stream-oriented file descriptor. The server takes + * ownership on success. + * @return 0 on success, -1 on failure (server not running). + */ +int vws_tcp_svr_inject_fd(vws_tcp_svr* server, int fd); + +/** + * @brief Establishes an in-process WebSocket connection to a running server. + * + * Creates a connected socket pair via uv_socketpair(), injects one end into + * the server's loop, and wraps the other end as a vws_cnx client that has + * already completed the WebSocket handshake. The server must be running in a + * different thread; this call blocks the caller until the handshake completes. + * + * The returned client has frame masking disabled (server mode) since the + * socketpair has no intermediate proxies that masking is meant to protect + * against. This skips per-frame XOR and mask-key generation. + * + * @param server The running server to connect to. + * @return A connected vws_cnx on success, NULL on failure. + */ +vws_cnx* vws_pipe_connect(vws_tcp_svr* server); + /** * @brief Check that peers are all online * diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index c2ebf1c..135c1f1 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -39,6 +39,7 @@ if(BUILD_SERVER) test_inetd_server test_msg_server test_peering + test_pipe_server test_server test_ws_server ) endif() diff --git a/src/test/test_pipe_server.c b/src/test/test_pipe_server.c new file mode 100644 index 0000000..14d1824 --- /dev/null +++ b/src/test/test_pipe_server.c @@ -0,0 +1,86 @@ +// In-process WebSocket via uv_socketpair() + vws_pipe_connect(). +// +// Demonstrates how an arbitrary connected stream fd can be injected into a +// running vws server. The server side adopts the fd as a uv_pipe_t in its +// libuv loop; the client side wraps the other end as a fully-handshaken +// vws_cnx. + +#include "server.h" +#include "message.h" + +#define CTEST_MAIN +#include "ctest.h" +#include "common.h" + +// Echoes any message back to the sender. Runs on a worker thread. +static void process(vws_svr* s, vws_cid_t cid, vws_msg* m, void* ctx) +{ + vws_msg* reply = vws_msg_new(); + reply->opcode = m->opcode; + vws_buffer_append(reply->data, m->data->data, m->data->size); + + s->send(s, cid, reply, NULL); + vws_msg_free(m); +} + +static void server_thread(void* arg) +{ + vws_tcp_svr* server = (vws_tcp_svr*)arg; + vws.tracelevel = VT_THREAD; + server->trace = vws.tracelevel; + + // Bind to an ephemeral port. We won't connect over TCP, but + // vws_tcp_svr_run() requires a host/port to bind. The pipe path + // bypasses listen/accept entirely. + vws_tcp_svr_run(server, "127.0.0.1", 0); + + vws_cleanup(); +} + +CTEST(test_pipe_server, echo) +{ + vws_svr* server = vws_svr_new(4, 0, 0); + server->process_ws = process; + + uv_thread_t server_tid; + uv_thread_create(&server_tid, server_thread, server); + + while (vws_tcp_svr_state((vws_tcp_svr*)server) != VS_RUNNING) + { + vws_msleep(10); + } + + // Open an in-process connection. No TCP, no DNS, no listen socket -- + // just a connected socket pair handed to the server's loop. + vws_cnx* cnx = vws_pipe_connect((vws_tcp_svr*)server); + ASSERT_NOT_NULL(cnx); + ASSERT_TRUE(vws_cnx_is_connected(cnx)); + + cstr payload = "hello inproc"; + for (int i = 0; i < 5; i++) + { + ASSERT_TRUE(vws_msg_send_text(cnx, payload) > 0); + + vws_msg* reply = vws_msg_recv(cnx); + ASSERT_NOT_NULL(reply); + ASSERT_TRUE(strncmp( payload, + (cstr)reply->data->data, + reply->data->size ) == 0); + vws_msg_free(reply); + } + + vws_disconnect(cnx); + vws_cnx_free(cnx); + + // Give the server time to finish flushing CLOSE frames. + sleep(1); + + vws_tcp_svr_stop((vws_tcp_svr*)server); + uv_thread_join(&server_tid); + vws_svr_free(server); +} + +int main(int argc, const char* argv[]) +{ + return ctest_main(argc, argv); +} diff --git a/src/websocket.c b/src/websocket.c index 3f5732a..a575118 100644 --- a/src/websocket.c +++ b/src/websocket.c @@ -383,6 +383,58 @@ bool vws_reconnect(vws_cnx* c) return false; } +bool vws_cnx_from_fd(vws_cnx* c, int fd) +{ + if (c == NULL || fd < 0) + { + vws.error(VE_RT, "Invalid connection or fd"); + return false; + } + + vws_buffer_clear(c->base.buffer); + + // Adopt the fd. No SSL on injected fds; this is intended for in-process + // transports (socketpair, pipes) where there is no remote peer to + // authenticate. + c->base.sockfd = fd; + c->base.ssl = NULL; + + if (vws_socket_set_timeout((vws_socket*)c, c->base.timeout / 1000) == false) + { + vws_socket_close((vws_socket*)c); + return false; + } + + // Mirror vws_socket_connect(): put the fd in O_NONBLOCK so the + // poll()-driven read/write loops work. Caller-facing API stays + // synchronous (blocking with timeout). + if (vws_socket_set_nonblocking(c->base.sockfd) == false) + { + vws_socket_close((vws_socket*)c); + return false; + } + + // The handshake callback expects c->url to be populated for building the + // upgrade request. Provide a synthetic URL when one was not set. + if (c->url == NULL) + { + c->url = (vws_url_data*)url_parse("ws://inproc/"); + } + + if (c->base.hs != NULL) + { + if (c->base.hs((vws_socket*)c) == false) + { + vws.error(VE_SYS, "Handshake failed"); + vws_socket_close((vws_socket*)c); + return false; + } + } + + vws.success(); + return true; +} + bool vws_cnx_is_connected(vws_cnx* c) { if (vws_socket_is_connected((vws_socket*)c) == false) diff --git a/src/websocket.h b/src/websocket.h index 04bfb8e..17a4c73 100644 --- a/src/websocket.h +++ b/src/websocket.h @@ -298,6 +298,31 @@ bool vws_connect(vws_cnx* c, cstr uri); */ bool vws_reconnect(vws_cnx* c); +/** + * @brief Initializes a WebSocket connection from an already-connected fd. + * + * Adopts the fd as the connection's socket and runs the standard client + * handshake. Intended for in-process transports (socketpair, pipes) where + * TCP connect and DNS resolution should be bypassed. SSL is not used on the + * injected fd. + * + * The fd is placed in O_NONBLOCK mode internally so the existing poll()-based + * read/write loops can drive it -- this is the same setup vws_connect() does + * after a TCP connect. The client API itself remains synchronous: subsequent + * vws_msg_send/vws_msg_recv calls block (with the configured timeout) just + * like any other vws client. + * + * On success, the connection takes ownership of the fd and will close it on + * vws_disconnect()/vws_cnx_free(). + * + * @param c The websocket connection (allocated via vws_cnx_new()). + * @param fd A connected, stream-oriented file descriptor. + * @return true on successful handshake, false otherwise. + * + * @ingroup ConnectionFunctions + */ +bool vws_cnx_from_fd(vws_cnx* c, int fd); + /** * @brief Closes the connection to the host. *