Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,27 @@ void address_pool_remove(address_pool* pool, int64_t i)
*/
static void uv_thread(uv_async_t* handle);

/**
* @brief Initializes a fd injection queue.
*/
static void fd_queue_init(vws_fd_queue* q);

/**
* @brief Destroys a fd injection queue, closing any pending fds.
*/
static void fd_queue_destroy(vws_fd_queue* q);

/**
* @brief UV async callback that drains the server's fd injection queue.
*
* Runs on the network thread when vws_tcp_svr_inject_fd() signals the
* server->fd_inject_async handle. Pops every queued fd and adopts it into
* the loop as a uv_pipe_t.
*
* @param handle The async handle that triggered the callback.
*/
static void on_fd_inject(uv_async_t* handle);

/**
* @brief The entry point for a worker thread.
*
Expand Down Expand Up @@ -1710,6 +1731,20 @@ vws_tcp_svr* tcp_svr_ctor(vws_tcp_svr* svr, int nt, int backlog, int queue_size)
svr->peer_timer = vws.malloc(sizeof(uv_timer_t));
uv_timer_init(svr->loop, svr->peer_timer);

// FD injection queue and async wakeup. The async->data carries a cinfo
// with an invalid cid so svr_on_close() can free it on shutdown without
// touching connection state.
fd_queue_init(&svr->fd_queue);

vws_cinfo* fd_ci = vws.malloc(sizeof(vws_cinfo));
fd_ci->cnx = NULL;
fd_ci->server = svr;
fd_ci->cid.key = 0;

svr->fd_inject_async = vws.malloc(sizeof(uv_async_t));
svr->fd_inject_async->data = fd_ci;
uv_async_init(svr->loop, svr->fd_inject_async, on_fd_inject);

return svr;
}

Expand Down Expand Up @@ -1749,6 +1784,14 @@ void tcp_svr_dtor(vws_tcp_svr* svr)
// Close the server async handle
uv_close((uv_handle_t*)svr->wakeup, svr_on_close);

// Close the fd injection async handle. svr_on_close() will free the
// attached cinfo and the handle itself.
uv_close((uv_handle_t*)svr->fd_inject_async, svr_on_close);

// Drain and destroy the fd injection queue. Any fds still pending have
// not been adopted by the loop and are closed here.
fd_queue_destroy(&svr->fd_queue);

// Close peer timer
svr->peer_timer->data = NULL;
uv_close((uv_handle_t*)svr->peer_timer, svr_on_timer_close);
Expand Down Expand Up @@ -2297,6 +2340,208 @@ bool queue_empty(vws_svr_queue* queue)
return empty;
}

//------------------------------------------------------------------------------
// FD Injection Queue
//------------------------------------------------------------------------------

static void fd_queue_init(vws_fd_queue* q)
{
q->head = NULL;
q->tail = NULL;
uv_mutex_init(&q->mutex);
}

static void fd_queue_destroy(vws_fd_queue* q)
{
uv_mutex_lock(&q->mutex);

vws_fd_node* n = q->head;
while (n != NULL)
{
vws_fd_node* next = n->next;

// Close any fd that never got adopted into the loop.
if (n->fd >= 0)
{
close(n->fd);
}

vws.free(n);
n = next;
}
q->head = NULL;
q->tail = NULL;

uv_mutex_unlock(&q->mutex);
uv_mutex_destroy(&q->mutex);
}

static void fd_queue_push(vws_fd_queue* q, int fd)
{
vws_fd_node* n = vws.malloc(sizeof(vws_fd_node));
n->fd = fd;
n->next = NULL;

uv_mutex_lock(&q->mutex);

if (q->tail == NULL)
{
q->head = n;
q->tail = n;
}
else
{
q->tail->next = n;
q->tail = n;
}

uv_mutex_unlock(&q->mutex);
}

// Detach the entire chain so the loop thread can drain it without holding
// the mutex.
static vws_fd_node* fd_queue_take_all(vws_fd_queue* q)
{
uv_mutex_lock(&q->mutex);
vws_fd_node* head = q->head;
q->head = NULL;
q->tail = NULL;
uv_mutex_unlock(&q->mutex);

return head;
}

// Adopt an injected fd into the libuv loop as a uv_pipe_t and run it through
// the standard connection setup. Must be called on the loop thread.
static void adopt_injected_fd(vws_tcp_svr* server, int fd)
{
uv_pipe_t* p = (uv_pipe_t*)vws.malloc(sizeof(uv_pipe_t));

if (uv_pipe_init(server->loop, p, 0) != 0)
{
vws.error(VE_RT, "uv_pipe_init failed");
vws.free(p);
close(fd);
return;
}

if (uv_pipe_open(p, fd) != 0)
{
vws.error(VE_RT, "uv_pipe_open failed");
vws.free(p);
close(fd);
return;
}

vws_cinfo* ci = vws.malloc(sizeof(vws_cinfo));
ci->cnx = NULL;
ci->server = server;
p->data = ci;

if (uv_read_start((uv_stream_t*)p, svr_on_realloc, svr_on_read) != 0)
{
vws.error(VE_RT, "uv_read_start failed on injected fd");
vws.free(ci);
uv_close((uv_handle_t*)p, NULL);
return;
}

vws_svr_cnx* cnx = svr_cnx_new(server, (uv_stream_t*)p);
ci->cnx = cnx;
ci->cid = cnx->cid;

server->on_connect(cnx);
}

static void on_fd_inject(uv_async_t* handle)
{
vws_cinfo* ci = (vws_cinfo*)handle->data;
vws_tcp_svr* server = ci->server;

if (server->state != VS_RUNNING)
{
return;
}

vws_fd_node* n = fd_queue_take_all(&server->fd_queue);
while (n != NULL)
{
vws_fd_node* next = n->next;
adopt_injected_fd(server, n->fd);
vws.free(n);
n = next;
}
}

int vws_tcp_svr_inject_fd(vws_tcp_svr* server, int fd)
{
if (server == NULL || fd < 0)
{
return -1;
}

if (server->state != VS_RUNNING)
{
vws.error(VE_RT, "server not running");
return -1;
}

fd_queue_push(&server->fd_queue, fd);
uv_async_send(server->fd_inject_async);

return 0;
}

vws_cnx* vws_pipe_connect(vws_tcp_svr* server)
{
if (server == NULL)
{
vws.error(VE_RT, "vws_pipe_connect: NULL server");
return NULL;
}

uv_os_sock_t fds[2];
int rc = uv_socketpair( SOCK_STREAM,
0,
fds,
UV_NONBLOCK_PIPE,
UV_NONBLOCK_PIPE );

if (rc < 0)
{
vws.error(VE_RT, "uv_socketpair() failed: %s", uv_strerror(rc));
return NULL;
}

// Hand fds[0] to the server. After this call the server owns it.
if (vws_tcp_svr_inject_fd(server, (int)fds[0]) != 0)
{
close((int)fds[0]);
close((int)fds[1]);
return NULL;
}

// Wrap fds[1] as a client and run the websocket handshake. This blocks
// until the server-side has processed the upgrade and replied, which
// requires the server's loop to be running on a different thread.
vws_cnx* cnx = vws_cnx_new();

// Frame masking exists to defeat cache-poisoning attacks on
// intermediate proxies. There are no proxies on a socketpair, so
// skip the per-frame XOR and key generation.
vws_cnx_set_server_mode(cnx);

if (vws_cnx_from_fd(cnx, (int)fds[1]) == false)
{
// Handshake failed. The fd is owned by the cnx at this point so
// vws_cnx_free will close it.
vws_cnx_free(cnx);
return NULL;
}

return cnx;
}

//------------------------------------------------------------------------------
// Pure WebSocket Server
//------------------------------------------------------------------------------
Expand Down
79 changes: 79 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,42 @@ typedef struct

} vws_svr_queue;

/**
* @struct vws_fd_node
* @brief Linked-list node holding a single file descriptor pending injection
* into a running server's libuv loop.
*/
typedef struct vws_fd_node
{
/**< File descriptor to be adopted by the loop */
int fd;

/**< Next node in the queue */
struct vws_fd_node* next;

} vws_fd_node;

/**
* @struct vws_fd_queue
* @brief Thread-safe FIFO of file descriptors awaiting injection.
*
* Producers (any thread) push fds via vws_tcp_svr_inject_fd(). The libuv
* network thread drains the queue in response to a uv_async_t signal and
* adopts each fd into the loop as a uv_pipe_t.
*/
typedef struct vws_fd_queue
{
/**< Head of the queue (oldest entry) */
vws_fd_node* head;

/**< Tail of the queue (newest entry) */
vws_fd_node* tail;

/**< Mutex protecting head/tail */
uv_mutex_t mutex;

} vws_fd_queue;

struct vws_tcp_svr;

/**
Expand Down Expand Up @@ -557,6 +593,13 @@ typedef struct vws_tcp_svr

/**< The peer timer */
uv_timer_t* peer_timer;

/**< Queue of file descriptors awaiting injection into the loop */
vws_fd_queue fd_queue;

/**< Async handle that wakes the loop to drain fd_queue */
uv_async_t* fd_inject_async;

} vws_tcp_svr;

/**
Expand Down Expand Up @@ -650,6 +693,42 @@ int vws_tcp_svr_inetd_run(vws_tcp_svr* server, int sockfd);
*/
void vws_tcp_svr_inetd_stop(vws_tcp_svr* server);

/**
* @brief Injects an already-connected file descriptor into a running server.
*
* This pushes the fd onto the server's injection queue and signals the libuv
* loop. The network thread will drain the queue, adopt the fd into the loop
* as a uv_pipe_t, and run it through the normal connection setup so that it
* is treated like any other client. Suitable for any stream-oriented fd
* (sockets from uv_socketpair(), Unix domain sockets, anonymous pipes, etc.).
*
* Safe to call from any thread. The server must be in VS_RUNNING state with
* its loop running on a different thread than the caller.
*
* @param server The server.
* @param fd A connected, stream-oriented file descriptor. The server takes
* ownership on success.
* @return 0 on success, -1 on failure (server not running).
*/
int vws_tcp_svr_inject_fd(vws_tcp_svr* server, int fd);

/**
* @brief Establishes an in-process WebSocket connection to a running server.
*
* Creates a connected socket pair via uv_socketpair(), injects one end into
* the server's loop, and wraps the other end as a vws_cnx client that has
* already completed the WebSocket handshake. The server must be running in a
* different thread; this call blocks the caller until the handshake completes.
*
* The returned client has frame masking disabled (server mode) since the
* socketpair has no intermediate proxies that masking is meant to protect
* against. This skips per-frame XOR and mask-key generation.
*
* @param server The running server to connect to.
* @return A connected vws_cnx on success, NULL on failure.
*/
vws_cnx* vws_pipe_connect(vws_tcp_svr* server);

/**
* @brief Check that peers are all online
*
Expand Down
1 change: 1 addition & 0 deletions src/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ if(BUILD_SERVER)
test_inetd_server
test_msg_server
test_peering
test_pipe_server
test_server
test_ws_server )
endif()
Expand Down
Loading