Skip to content

Commit 4e48aff

Browse files
authored
Merge pull request #1 from cgwalters/cleanups
Bug fixes and performance improvements
2 parents 7f756ce + a6ac3d9 commit 4e48aff

10 files changed

Lines changed: 1022 additions & 241 deletions

File tree

Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
[package]
2-
name = "ndjson-rpc-fdpass"
2+
name = "jsonrpc-fdpass"
33
version = "0.1.0"
44
edition = "2021"
5-
description = "NDJSON JSON-RPC 2.0 with File Descriptor Passing implementation"
5+
description = "JSON-RPC 2.0 with Unix file descriptor passing"
66
authors = ["Colin Walters <walters@verbum.org>"]
77
license = "MIT OR Apache-2.0"
88
repository = "https://github.com/cgwalters/spec-json-rpc-fdpass"
@@ -15,5 +15,8 @@ thiserror = "1.0"
1515
tokio = { version = "1.40", features = ["full"] }
1616
tracing = "0.1"
1717
tracing-subscriber = "0.3"
18+
jsonrpsee = { version = "0.24", features = ["server", "client-core", "async-client"], default-features = false }
19+
20+
[dev-dependencies]
1821
tempfile = "3.0"
19-
jsonrpsee = { version = "0.24", features = ["server", "client-core"], default-features = false }
22+
async-trait = "0.1"

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
# A Specification for NDJSON JSON-RPC 2.0 with File Descriptor Passing (NDJSON-RPC-FD)
1+
# JSON-RPC 2.0 with Unix File Descriptor Passing
2+
3+
This repository contains both a protocol specification and a Rust implementation
4+
(`jsonrpc-fdpass` crate) for JSON-RPC 2.0 with file descriptor passing over Unix
5+
domain sockets.
26

37
## 1. Overview
48

src/main.rs renamed to examples/demo.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use ndjson_rpc_fdpass::{Client, Result, Server};
1+
use jsonrpc_fdpass::{Client, Result, Server};
22
use serde_json::Value;
33
use std::fs::File;
44
use std::io::{Read, Write};
@@ -33,7 +33,7 @@ async fn run_server(listener: UnixListener) -> Result<()> {
3333
// Register a method that reads from a file descriptor
3434
server.register_method("read_file", |_method, _params, fds| {
3535
if fds.is_empty() {
36-
return Err(ndjson_rpc_fdpass::Error::InvalidMessage(
36+
return Err(jsonrpc_fdpass::Error::InvalidMessage(
3737
"Expected file descriptor".to_string(),
3838
));
3939
}
@@ -43,7 +43,7 @@ async fn run_server(listener: UnixListener) -> Result<()> {
4343
let mut contents = String::new();
4444

4545
file.read_to_string(&mut contents)
46-
.map_err(|e| ndjson_rpc_fdpass::Error::Io(e))?;
46+
.map_err(jsonrpc_fdpass::Error::Io)?;
4747

4848
info!("Server read from file: {}", contents.trim());
4949
Ok((Some(Value::String(contents)), Vec::new()))
@@ -63,7 +63,7 @@ async fn run_server(listener: UnixListener) -> Result<()> {
6363

6464
// Accept one connection and handle it
6565
if let Ok((stream, _)) = listener.accept().await {
66-
let transport = ndjson_rpc_fdpass::UnixSocketTransport::new(stream);
66+
let transport = jsonrpc_fdpass::UnixSocketTransport::new(stream)?;
6767
let (mut sender, mut receiver) = transport.split();
6868

6969
// Handle messages from this connection
@@ -85,8 +85,7 @@ async fn run_client(socket_path: PathBuf) -> Result<()> {
8585
let mut client = Client::connect(&socket_path).await?;
8686

8787
// Create a temporary file to send to the server
88-
let mut temp_file =
89-
tempfile::NamedTempFile::new().map_err(|e| ndjson_rpc_fdpass::Error::Io(e))?;
88+
let mut temp_file = tempfile::NamedTempFile::new().map_err(jsonrpc_fdpass::Error::Io)?;
9089

9190
write!(temp_file, "Hello from client file!").unwrap();
9291
temp_file.flush().unwrap();

src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub struct Client {
1515
impl Client {
1616
pub async fn connect<P: AsRef<Path>>(path: P) -> Result<Self> {
1717
let stream = UnixStream::connect(path).await?;
18-
let transport = UnixSocketTransport::new(stream);
18+
let transport = UnixSocketTransport::new(stream)?;
1919
let (sender, _receiver) = transport.split();
2020

2121
Ok(Self { sender, next_id: 1 })

src/lib.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
//! # NDJSON JSON-RPC 2.0 with File Descriptor Passing
1+
//! # JSON-RPC 2.0 with Unix File Descriptor Passing
22
//!
3-
//! This crate provides an implementation of the NDJSON JSON-RPC 2.0 with File Descriptor Passing
4-
//! specification. It enables reliable inter-process communication (IPC) over Unix domain sockets
5-
//! with the ability to pass file descriptors alongside JSON-RPC messages.
3+
//! This crate provides an implementation of JSON-RPC 2.0 with file descriptor passing over Unix
4+
//! domain sockets. It enables reliable inter-process communication (IPC) with the ability to
5+
//! pass file descriptors alongside JSON-RPC messages.
66
//!
77
//! ## Features
88
//!
99
//! - **JSON-RPC 2.0 compliance**: Full support for requests, responses, and notifications
1010
//! - **File descriptor passing**: Pass file descriptors using Unix socket ancillary data
11-
//! - **NDJSON framing**: Newline-delimited JSON for reliable message boundaries
11+
//! - **Streaming JSON parsing**: Self-delimiting JSON messages without newline requirements
1212
//! - **Async support**: Built on tokio for high-performance async I/O
1313
//! - **Type-safe**: Rust's type system ensures correct message handling
1414
//!
@@ -17,7 +17,7 @@
1717
//! ### Server Example
1818
//!
1919
//! ```rust,no_run
20-
//! use ndjson_rpc_fdpass::{Server, Result};
20+
//! use jsonrpc_fdpass::{Server, Result};
2121
//! use std::fs::File;
2222
//! use serde_json::Value;
2323
//!
@@ -33,7 +33,7 @@
3333
//! file.read_to_string(&mut contents).unwrap();
3434
//! Ok((Some(Value::String(contents)), Vec::new()))
3535
//! } else {
36-
//! Err(ndjson_rpc_fdpass::Error::InvalidMessage("No FD provided".into()))
36+
//! Err(jsonrpc_fdpass::Error::InvalidMessage("No FD provided".into()))
3737
//! }
3838
//! });
3939
//!
@@ -44,7 +44,7 @@
4444
//! ### Client Example
4545
//!
4646
//! ```rust,no_run
47-
//! use ndjson_rpc_fdpass::{Client, Result};
47+
//! use jsonrpc_fdpass::{Client, Result};
4848
//! use std::fs::File;
4949
//! use std::os::unix::io::OwnedFd;
5050
//! use serde_json::json;
@@ -71,13 +71,15 @@
7171
//!
7272
//! ## Protocol Details
7373
//!
74-
//! This implementation follows the NDJSON JSON-RPC with File Descriptor Passing specification:
74+
//! This implementation is a minimal extension to JSON-RPC 2.0 that adds file descriptor
75+
//! passing over Unix domain sockets:
7576
//!
7677
//! - Uses Unix domain sockets (SOCK_STREAM)
77-
//! - Messages are framed using newline-delimited JSON (NDJSON)
78-
//! - File descriptors are passed using ancillary data via sendmsg(2)/recvmsg(2)
79-
//! - Each sendmsg() call contains exactly one complete NDJSON message
80-
//! - File descriptors are represented in JSON using placeholder objects
78+
//! - Standard JSON-RPC 2.0 message format with no additional framing requirements
79+
//! - JSON objects are self-delimiting; no newline or length-prefix framing is required
80+
//! - File descriptors are passed as ancillary data via sendmsg(2)/recvmsg(2)
81+
//! - Each sendmsg() call contains exactly one complete JSON-RPC message
82+
//! - File descriptors are represented in JSON using placeholder objects (see below)
8183
//!
8284
//! ### File Descriptor Placeholders
8385
//!

src/message.rs

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,41 @@ use jsonrpsee::types::error::ErrorObject as JsonRpcError;
33
use serde::{Deserialize, Serialize};
44
use std::os::unix::io::OwnedFd;
55

6+
/// The JSON key used to identify file descriptor placeholders.
7+
pub const FD_PLACEHOLDER_KEY: &str = "__jsonrpc_fd__";
8+
/// The JSON key for the file descriptor index within a placeholder.
9+
pub const FD_INDEX_KEY: &str = "index";
10+
/// The JSON-RPC protocol version.
11+
pub const JSONRPC_VERSION: &str = "2.0";
12+
13+
/// Count file descriptor placeholders in a JSON value.
14+
pub fn count_fd_placeholders(value: &serde_json::Value) -> usize {
15+
fn count_inner(value: &serde_json::Value, count: &mut usize) {
16+
match value {
17+
serde_json::Value::Object(map) => {
18+
if let (Some(serde_json::Value::Bool(true)), Some(_)) =
19+
(map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY))
20+
{
21+
*count += 1;
22+
} else {
23+
for v in map.values() {
24+
count_inner(v, count);
25+
}
26+
}
27+
}
28+
serde_json::Value::Array(arr) => {
29+
for v in arr {
30+
count_inner(v, count);
31+
}
32+
}
33+
_ => {}
34+
}
35+
}
36+
let mut count = 0;
37+
count_inner(value, &mut count);
38+
count
39+
}
40+
641
#[derive(Debug, Clone, Serialize, Deserialize)]
742
pub struct FileDescriptorPlaceholder {
843
#[serde(rename = "__jsonrpc_fd__")]
@@ -55,7 +90,7 @@ pub enum JsonRpcMessage {
5590
impl JsonRpcRequest {
5691
pub fn new(method: String, params: Option<serde_json::Value>, id: serde_json::Value) -> Self {
5792
Self {
58-
jsonrpc: "2.0".to_string(),
93+
jsonrpc: JSONRPC_VERSION.to_string(),
5994
method,
6095
params,
6196
id,
@@ -66,7 +101,7 @@ impl JsonRpcRequest {
66101
impl JsonRpcResponse {
67102
pub fn success(result: serde_json::Value, id: serde_json::Value) -> Self {
68103
Self {
69-
jsonrpc: "2.0".to_string(),
104+
jsonrpc: JSONRPC_VERSION.to_string(),
70105
result: Some(result),
71106
error: None,
72107
id,
@@ -75,7 +110,7 @@ impl JsonRpcResponse {
75110

76111
pub fn error(error: JsonRpcError<'static>, id: serde_json::Value) -> Self {
77112
Self {
78-
jsonrpc: "2.0".to_string(),
113+
jsonrpc: JSONRPC_VERSION.to_string(),
79114
result: None,
80115
error: Some(error),
81116
id,
@@ -86,7 +121,7 @@ impl JsonRpcResponse {
86121
impl JsonRpcNotification {
87122
pub fn new(method: String, params: Option<serde_json::Value>) -> Self {
88123
Self {
89-
jsonrpc: "2.0".to_string(),
124+
jsonrpc: JSONRPC_VERSION.to_string(),
90125
method,
91126
params,
92127
}
@@ -137,18 +172,30 @@ impl MessageWithFds {
137172
}
138173

139174
pub fn serialize_with_placeholders(&self) -> Result<String> {
175+
self.serialize_with_placeholders_impl(false)
176+
}
177+
178+
pub fn serialize_with_placeholders_pretty(&self) -> Result<String> {
179+
self.serialize_with_placeholders_impl(true)
180+
}
181+
182+
fn serialize_with_placeholders_impl(&self, pretty: bool) -> Result<String> {
140183
let mut message_json = self.message.to_json_value()?;
141184
self.insert_placeholders(&mut message_json)?;
142185

143-
let json_str = serde_json::to_string(&message_json)?;
144-
Ok(format!("{}\n", json_str))
186+
let json_str = if pretty {
187+
serde_json::to_string_pretty(&message_json)?
188+
} else {
189+
serde_json::to_string(&message_json)?
190+
};
191+
Ok(json_str)
145192
}
146193

147194
fn insert_placeholders(&self, value: &mut serde_json::Value) -> Result<()> {
148195
let fd_count = self.file_descriptors.len();
149196
let mut placeholder_indices = Vec::new();
150197

151-
self.collect_placeholder_indices(value, &mut placeholder_indices);
198+
Self::collect_placeholder_indices(value, &mut placeholder_indices);
152199

153200
if placeholder_indices.len() != fd_count {
154201
return Err(Error::MismatchedCount {
@@ -166,26 +213,26 @@ impl MessageWithFds {
166213
Ok(())
167214
}
168215

169-
fn collect_placeholder_indices(&self, value: &serde_json::Value, indices: &mut Vec<usize>) {
216+
fn collect_placeholder_indices(value: &serde_json::Value, indices: &mut Vec<usize>) {
170217
match value {
171218
serde_json::Value::Object(map) => {
172219
if let (
173220
Some(serde_json::Value::Bool(true)),
174221
Some(serde_json::Value::Number(index)),
175-
) = (map.get("__jsonrpc_fd__"), map.get("index"))
222+
) = (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY))
176223
{
177224
if let Some(index) = index.as_u64() {
178225
indices.push(index as usize);
179226
}
180227
} else {
181228
for v in map.values() {
182-
self.collect_placeholder_indices(v, indices);
229+
Self::collect_placeholder_indices(v, indices);
183230
}
184231
}
185232
}
186233
serde_json::Value::Array(arr) => {
187234
for v in arr {
188-
self.collect_placeholder_indices(v, indices);
235+
Self::collect_placeholder_indices(v, indices);
189236
}
190237
}
191238
_ => {}
@@ -195,8 +242,7 @@ impl MessageWithFds {
195242
pub fn from_json_with_fds(json_str: &str, fds: Vec<OwnedFd>) -> Result<Self> {
196243
let message_json: serde_json::Value = serde_json::from_str(json_str)?;
197244

198-
let mut placeholder_count = 0;
199-
Self::count_placeholders(&message_json, &mut placeholder_count);
245+
let placeholder_count = count_fd_placeholders(&message_json);
200246

201247
if placeholder_count != fds.len() {
202248
return Err(Error::MismatchedCount {
@@ -215,28 +261,6 @@ impl MessageWithFds {
215261
Ok(Self::new(message, fds))
216262
}
217263

218-
fn count_placeholders(value: &serde_json::Value, count: &mut usize) {
219-
match value {
220-
serde_json::Value::Object(map) => {
221-
if let (Some(serde_json::Value::Bool(true)), Some(_)) =
222-
(map.get("__jsonrpc_fd__"), map.get("index"))
223-
{
224-
*count += 1;
225-
} else {
226-
for v in map.values() {
227-
Self::count_placeholders(v, count);
228-
}
229-
}
230-
}
231-
serde_json::Value::Array(arr) => {
232-
for v in arr {
233-
Self::count_placeholders(v, count);
234-
}
235-
}
236-
_ => {}
237-
}
238-
}
239-
240264
fn validate_placeholder_indices(
241265
value: &serde_json::Value,
242266
expected_count: usize,
@@ -260,7 +284,7 @@ impl MessageWithFds {
260284
if let (
261285
Some(serde_json::Value::Bool(true)),
262286
Some(serde_json::Value::Number(index)),
263-
) = (map.get("__jsonrpc_fd__"), map.get("index"))
287+
) = (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY))
264288
{
265289
if let Some(index) = index.as_u64() {
266290
indices.push(index as usize);

src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl Server {
6868
}
6969

7070
async fn handle_connection(&self, stream: UnixStream) -> Result<()> {
71-
let transport = UnixSocketTransport::new(stream);
71+
let transport = UnixSocketTransport::new(stream)?;
7272
let (mut sender, mut receiver) = transport.split();
7373

7474
debug!("New connection established");

0 commit comments

Comments
 (0)