Skip to content

Commit 677ec22

Browse files
Load payments in parallel using std::thread::scope
This significantly improves startup time for network-backed stores like VSS, where sequential reads incur per-key network latency.
1 parent bbefa73 commit 677ec22

File tree

1 file changed

+134
-23
lines changed

1 file changed

+134
-23
lines changed

src/io/utils.rs

Lines changed: 134 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -213,35 +213,61 @@ where
213213
}
214214

215215
/// Read previously persisted payments information from the store.
216-
pub(crate) fn read_payments<L: Deref>(
216+
pub(crate) fn read_payments<L: Deref + Clone + Send>(
217217
kv_store: Arc<DynStore>, logger: L,
218218
) -> Result<Vec<PaymentDetails>, std::io::Error>
219219
where
220220
L::Target: LdkLogger,
221221
{
222-
let mut res = Vec::new();
223-
224-
for stored_key in KVStoreSync::list(
222+
let keys = KVStoreSync::list(
225223
&*kv_store,
226224
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
227225
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
228-
)? {
229-
let mut reader = Cursor::new(KVStoreSync::read(
230-
&*kv_store,
231-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
232-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
233-
&stored_key,
234-
)?);
235-
let payment = PaymentDetails::read(&mut reader).map_err(|e| {
236-
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
237-
std::io::Error::new(
238-
std::io::ErrorKind::InvalidData,
239-
"Failed to deserialize PaymentDetails",
240-
)
241-
})?;
242-
res.push(payment);
243-
}
244-
Ok(res)
226+
)?;
227+
228+
// Read all payments in parallel using scoped threads.
229+
// This significantly improves performance for network-backed stores like VSS,
230+
// where sequential reads would incur per-key network latency.
231+
let results: Vec<Result<PaymentDetails, std::io::Error>> = std::thread::scope(|s| {
232+
let handles: Vec<_> = keys
233+
.iter()
234+
.map(|key| {
235+
let kv_store = Arc::clone(&kv_store);
236+
let logger = logger.clone();
237+
s.spawn(move || {
238+
let mut reader = Cursor::new(KVStoreSync::read(
239+
&*kv_store,
240+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
241+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
242+
key,
243+
)?);
244+
PaymentDetails::read(&mut reader).map_err(|e| {
245+
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
246+
std::io::Error::new(
247+
std::io::ErrorKind::InvalidData,
248+
"Failed to deserialize PaymentDetails",
249+
)
250+
})
251+
})
252+
})
253+
.collect();
254+
255+
handles
256+
.into_iter()
257+
.map(|h| {
258+
h.join()
259+
.map_err(|_| {
260+
std::io::Error::new(
261+
std::io::ErrorKind::Other,
262+
"Thread panicked while reading payment",
263+
)
264+
})
265+
.and_then(|r| r)
266+
})
267+
.collect()
268+
});
269+
270+
results.into_iter().collect()
245271
}
246272

247273
/// Read `OutputSweeper` state from the store.
@@ -578,8 +604,22 @@ pub(crate) fn read_bdk_wallet_change_set(
578604

579605
#[cfg(test)]
580606
mod tests {
581-
use super::read_or_generate_seed_file;
582-
use super::test_utils::random_storage_path;
607+
use std::sync::Arc;
608+
609+
use lightning::ln::channelmanager::PaymentId;
610+
use lightning::util::persist::KVStoreSync;
611+
use lightning::util::ser::Writeable;
612+
use lightning::util::test_utils::TestLogger;
613+
use lightning_types::payment::PaymentHash;
614+
615+
use super::test_utils::{random_storage_path, InMemoryStore};
616+
use super::{
617+
read_or_generate_seed_file, read_payments, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
618+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
619+
};
620+
use crate::hex_utils;
621+
use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus};
622+
use crate::types::{DynStore, DynStoreWrapper};
583623

584624
#[test]
585625
fn generated_seed_is_readable() {
@@ -589,4 +629,75 @@ mod tests {
589629
let read_seed_bytes = read_or_generate_seed_file(&rand_path.to_str().unwrap()).unwrap();
590630
assert_eq!(expected_seed_bytes, read_seed_bytes);
591631
}
632+
633+
#[test]
634+
fn read_payments_parallel_returns_all_payments() {
635+
let store: Arc<DynStore> = Arc::new(DynStoreWrapper(InMemoryStore::new()));
636+
let logger = Arc::new(TestLogger::new());
637+
638+
// Create and persist multiple payments
639+
let num_payments = 50;
640+
let mut expected_payments = Vec::with_capacity(num_payments);
641+
642+
for i in 0..num_payments {
643+
let mut id_bytes = [0u8; 32];
644+
id_bytes[0] = i as u8;
645+
id_bytes[1] = (i >> 8) as u8;
646+
let payment_id = PaymentId(id_bytes);
647+
648+
let payment = PaymentDetails::new(
649+
payment_id,
650+
PaymentKind::Spontaneous { hash: PaymentHash(id_bytes), preimage: None },
651+
Some(1000 * (i as u64 + 1)),
652+
None,
653+
PaymentDirection::Outbound,
654+
PaymentStatus::Succeeded,
655+
);
656+
657+
// Write payment to store
658+
let key = hex_utils::to_string(&payment_id.0);
659+
KVStoreSync::write(
660+
&*store,
661+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
662+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
663+
&key,
664+
payment.encode(),
665+
)
666+
.unwrap();
667+
668+
expected_payments.push(payment);
669+
}
670+
671+
// Read all payments back using the parallel implementation
672+
let read_back = read_payments(Arc::clone(&store), Arc::clone(&logger)).unwrap();
673+
674+
// Verify count matches
675+
assert_eq!(
676+
read_back.len(),
677+
num_payments,
678+
"Expected {} payments but got {}",
679+
num_payments,
680+
read_back.len()
681+
);
682+
683+
// Verify all expected payments are present (order may differ due to parallelism)
684+
for expected in &expected_payments {
685+
assert!(
686+
read_back
687+
.iter()
688+
.any(|p| p.id == expected.id && p.amount_msat == expected.amount_msat),
689+
"Payment {:?} not found in read results",
690+
expected.id
691+
);
692+
}
693+
}
694+
695+
#[test]
696+
fn read_payments_empty_store_returns_empty_vec() {
697+
let store: Arc<DynStore> = Arc::new(DynStoreWrapper(InMemoryStore::new()));
698+
let logger = Arc::new(TestLogger::new());
699+
700+
let payments = read_payments(Arc::clone(&store), Arc::clone(&logger)).unwrap();
701+
assert!(payments.is_empty(), "Expected empty vec for empty store");
702+
}
592703
}

0 commit comments

Comments
 (0)