Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ pub struct Config {
pub subgraph_service: Address,
/// x402 payment support configuration
pub x402: Option<X402Config>,
/// Maximum response size from indexers in bytes (default: 50MB)
#[serde(default = "default_max_indexer_response_size")]
pub max_indexer_response_size: usize,
}

fn default_max_indexer_response_size() -> usize {
50 * 1024 * 1024 // 50 MB
}

/// API keys configuration.
Expand Down
45 changes: 40 additions & 5 deletions src/indexer_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use http::{StatusCode, header::CONTENT_TYPE};
use http::{
StatusCode,
header::{CONTENT_LENGTH, CONTENT_TYPE},
};
use reqwest::header::AUTHORIZATION;
use serde::{Deserialize, Serialize};
use thegraph_core::{
Expand Down Expand Up @@ -31,6 +34,9 @@ pub struct IndexerResponse {
#[derive(Clone)]
pub struct IndexerClient {
pub client: reqwest::Client,
/// Maximum allowed response size from indexers.
/// Prevents memory exhaustion from malicious or buggy indexers.
pub max_response_size: usize,
}

pub enum IndexerAuth<'a> {
Expand Down Expand Up @@ -79,17 +85,31 @@ impl IndexerClient {
tracing::debug!(indexed_block = indexed_block.unwrap_or("null"));
let indexed_block = indexed_block.and_then(parse_graph_indexed_header);

// Fast-path rejection: check Content-Length header before reading body
let max_response_size = self.max_response_size;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This let binding seems unnecessary

if let Some(content_length) = response.headers().get(CONTENT_LENGTH)
&& let Ok(len) = content_length.to_str().unwrap_or("0").parse::<usize>()
&& len > max_response_size
{
return Err(BadResponse(format!(
"response too large: {len} bytes (max {max_response_size})"
)));
}

// Read body with size limit to prevent memory exhaustion
let body = read_body_limited(response, max_response_size)
.await
.map_err(BadResponse)?;

#[derive(Debug, Deserialize)]
pub struct IndexerResponsePayload {
#[serde(rename = "graphQLResponse")]
pub graphql_response: Option<String>,
pub attestation: Option<Attestation>,
pub error: Option<String>,
}
let payload = response
.json::<IndexerResponsePayload>()
.await
.map_err(|err| BadResponse(err.to_string()))?;
let payload: IndexerResponsePayload =
serde_json::from_slice(&body).map_err(|err| BadResponse(err.to_string()))?;
if let Some(err) = payload.error {
return Err(BadResponse(err));
}
Expand Down Expand Up @@ -208,6 +228,21 @@ fn check_block_error(err: &str) -> Result<(), MissingBlockError> {
})
}

/// Read response body with size limit, streaming to avoid unbounded memory allocation.
async fn read_body_limited(
mut response: reqwest::Response,
max_size: usize,
) -> Result<Vec<u8>, String> {
let mut body = Vec::new();
while let Some(chunk) = response.chunk().await.map_err(|e| e.to_string())? {
if body.len() + chunk.len() > max_size {
return Err(format!("response exceeds {max_size} byte limit"));
}
body.extend_from_slice(&chunk);
}
Ok(body)
}

#[cfg(test)]
mod tests {
use crate::errors::MissingBlockError;
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ async fn main() {

let indexer_client = IndexerClient {
client: http_client.clone(),
max_response_size: conf.max_indexer_response_size,
};
let network_subgraph_client = SubgraphClient {
client: indexer_client.clone(),
Expand Down
Loading