diff --git a/crates/misc/component-async-tests/tests/scenario/backpressure.rs b/crates/misc/component-async-tests/tests/scenario/backpressure.rs index 21f18cdf2723..782706011b31 100644 --- a/crates/misc/component-async-tests/tests/scenario/backpressure.rs +++ b/crates/misc/component-async-tests/tests/scenario/backpressure.rs @@ -1,13 +1,121 @@ -use wasmtime::Result; +use component_async_tests::Ctx; +use std::{ + env, future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use wasmtime::{ + Engine, Result, Store, + component::{Linker, ResourceTable}, +}; +use wasmtime_wasi::WasiCtxBuilder; -use super::util::test_run; +use super::util::{config, make_component, test_run}; -// No-op function; we only test this by composing it in `async_backpressure_caller` -#[allow( - dead_code, - reason = "here only to make the `assert_test_exists` macro happy" -)] -pub fn async_backpressure_callee() {} +mod callee { + wasmtime::component::bindgen!({ + path: "wit", + world: "backpressure-callee", + exports: { default: async | store }, + }); +} + +#[tokio::test] +pub async fn async_backpressure_callee() -> Result<()> { + let mut config = config(); + // As of this writing, miri/pulley/epochs is a problematic combination, so + // we don't test it. + if env::var_os("MIRI_TEST_CWASM_DIR").is_none() { + config.epoch_interruption(true); + } + + let engine = Engine::new(&config)?; + let component = make_component( + &engine, + &[test_programs_artifacts::ASYNC_BACKPRESSURE_CALLEE_COMPONENT], + ) + .await?; + let mut linker = Linker::new(&engine); + wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; + + let mut store = Store::new( + &engine, + Ctx { + wasi: WasiCtxBuilder::new().inherit_stdio().build(), + table: ResourceTable::default(), + continue_: false, + }, + ); + + if env::var_os("MIRI_TEST_CWASM_DIR").is_none() { + store.set_epoch_deadline(1); + + std::thread::spawn(move || { + std::thread::sleep(Duration::from_secs(10)); + engine.increment_epoch(); + }); + } + + let guest = + callee::BackpressureCallee::instantiate_async(&mut store, &component, &linker).await?; + + store + .run_concurrent(async |accessor| { + guest + .local_local_backpressure() + .call_inc_then_later_dec_backpressure(accessor) + .await?; + + let func = *guest.local_local_run().func_run().func(); + + let mut a = Some(Box::pin(guest.local_local_run().call_run(accessor))); + let mut b = Some(Box::pin(guest.local_local_run().call_run(accessor))); + let mut c = Some(Box::pin(guest.local_local_run().call_run(accessor))); + + let mut backpressure_is_set = true; + future::poll_fn(move |cx| { + let instance_ready = accessor.poll_ready_for_concurrent_call(func, cx).is_ready(); + let a_ready = is_ready(cx, &mut a); + let b_ready = is_ready(cx, &mut b); + let c_ready = is_ready(cx, &mut c); + + if backpressure_is_set { + assert!(!instance_ready); + assert!(!a_ready); + assert!(!b_ready); + assert!(!c_ready); + + backpressure_is_set = false; + + Poll::Pending + } else if instance_ready && a_ready && b_ready && c_ready { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + + wasmtime::error::Ok(()) + }) + .await??; + + Ok(()) +} + +fn is_ready(cx: &mut Context, fut: &mut Option>>) -> bool { + if let Some(v) = fut.as_mut() { + if v.as_mut().poll(cx).is_ready() { + *fut = None; + true + } else { + false + } + } else { + true + } +} #[tokio::test] pub async fn async_backpressure_caller() -> Result<()> { diff --git a/crates/misc/component-async-tests/wit/test.wit b/crates/misc/component-async-tests/wit/test.wit index 84efb8ec40a0..59eae0d05deb 100644 --- a/crates/misc/component-async-tests/wit/test.wit +++ b/crates/misc/component-async-tests/wit/test.wit @@ -63,6 +63,7 @@ interface backpressure { set-backpressure: func(enabled: bool); inc-backpressure: func(); dec-backpressure: func(); + inc-then-later-dec-backpressure: async func(); } interface transmit { diff --git a/crates/test-programs/src/bin/async_backpressure_callee.rs b/crates/test-programs/src/bin/async_backpressure_callee.rs index 7f89a67d77fa..ec1fef7f0529 100644 --- a/crates/test-programs/src/bin/async_backpressure_callee.rs +++ b/crates/test-programs/src/bin/async_backpressure_callee.rs @@ -32,6 +32,16 @@ impl Backpressure for Component { fn dec_backpressure() { wit_bindgen::backpressure_dec(); } + async fn inc_then_later_dec_backpressure() { + wit_bindgen::backpressure_inc(); + + wit_bindgen::spawn_local(async { + for _ in 0..10 { + wit_bindgen::yield_async().await; + } + wit_bindgen::backpressure_dec(); + }); + } } // Unused function; required since this file is built as a `bin`: diff --git a/crates/test-programs/src/bin/async_cancel_callee.rs b/crates/test-programs/src/bin/async_cancel_callee.rs index 2ee686dec974..b92410843cf5 100644 --- a/crates/test-programs/src/bin/async_cancel_callee.rs +++ b/crates/test-programs/src/bin/async_cancel_callee.rs @@ -97,6 +97,16 @@ unsafe extern "C" fn export_dec_backpressure() { wit_bindgen::backpressure_dec(); } +#[unsafe(export_name = "[async-lift]local:local/backpressure#inc-then-later-dec-backpressure")] +unsafe extern "C" fn export_inc_then_later_dec_backpressure() -> u32 { + todo!() +} + +#[unsafe(export_name = "[callback][async-lift]local:local/backpressure#inc-then-later-dec-backpressure")] +unsafe extern "C" fn callback_inc_then_later_dec_backpressure(_: u32, _: u32, _: u32) -> u32 { + todo!() +} + #[unsafe(export_name = "local:local/yield#yield-times")] unsafe extern "C" fn export_yield_yield_times(times: u64) { unsafe { diff --git a/crates/wasi-http/src/handler.rs b/crates/wasi-http/src/handler.rs index 2dc49ab90766..236bdf8d4c45 100644 --- a/crates/wasi-http/src/handler.rs +++ b/crates/wasi-http/src/handler.rs @@ -793,6 +793,12 @@ where Some((pair, queue)) } )); + let func = match proxy { + #[cfg(feature = "p3")] + Proxy::P3(guest) => *guest.wasi_http_handler().func_handle().func(), + #[cfg(feature = "p2")] + Proxy::P2(guest) => *guest.wasi_http_incoming_handler().func_handle().func(), + }; future::poll_fn(|cx| { loop { // First, and crucially first, poll `futures`. This way @@ -839,6 +845,8 @@ where Poll::Ready(None) | Poll::Pending => {} } + let is_ready = accessor.poll_ready_for_concurrent_call(func, cx).is_ready(); + // At this point `futures` is either empty or it's `Pending` // meaning nothing is ready. Note that `Pending` here // doesn't necessarily mean all tasks are blocked on I/O. @@ -850,6 +858,7 @@ where // at all or all our tasks really are blocked on I/O. self.set_available( may_accept + && is_ready && match dropper .state .should_accept_request(futures.len(), reuse_count) @@ -905,7 +914,7 @@ where // if we're not actually capable of accepting any more work, // then we're completely done and it's time to exit this // worker. - if !may_accept { + if !(may_accept && is_ready) { break Poll::Ready(Ok(())); } diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index c9f27f936a4e..f6b921b6717a 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -539,8 +539,8 @@ where /// otherwise returns `Poll::Pending`. If pending is returned then whenever /// the last remaining "interesting" task has exited the provided context's /// waker will be notified. Note that only the waker passed to the last call - /// to `poll_no_interesting_tasks` will be notified, so this is only - /// appropriate to use one-at-a-time. + /// to `poll_no_interesting_tasks` for the store will be notified, so this + /// is only appropriate to use once-at-a-time per store. /// /// The component model specification, as of this current date, does not /// have a distinction between "interesting" tasks and not. The current @@ -579,6 +579,39 @@ where } }) } + + /// Poll to see if the component instance corresponding to the specified + /// function is ready to run a concurrent call without queuing it (i.e. does + /// not have backpressure enabled and does not have a sync call in + /// progress). + /// + /// Returns `Poll::Ready(())` if the component instance is ready to run a + /// concurrent call, and otherwise returns `Poll::Pending`. If pending is + /// returned then whenever the instance becomes ready for a call the + /// provided context's waker will be notified. Note that only the waker + /// passed to the last call to `poll_ready_for_concurrent_call` for the + /// store will be notified (regardless of whether the same or different + /// `Func` is specified relative to earlier calls), so this is only + /// appropriate to use once-at-a-time per store. Also note that the waker + /// may be notified when _any_ instance becomes callable (i.e. not + /// necessarily the last one polled), so this function must be called again + /// to determine if the instance of interest is ready. + pub fn poll_ready_for_concurrent_call(&self, func: Func, cx: &mut Context<'_>) -> Poll<()> { + self.with(|mut access| { + let store = access.as_context_mut().0; + let (_, _, _, raw_options) = func.abi_info(store); + let instance = func.instance().runtime_instance(raw_options.instance); + let state = store.instance_state(instance).concurrent_state(); + if state.backpressure == 0 { + Poll::Ready(()) + } else { + store + .concurrent_state_mut_without_forcing_current_thread() + .ready_for_concurrent_call_waker = Some(cx.waker().clone()); + Poll::Pending + } + }) + } } /// Represents a task which may be provided to `Accessor::spawn`, @@ -1983,6 +2016,9 @@ impl StoreOpaque { /// Iterate over `InstanceState::pending`, moving any ready items into the /// "high priority" work item queue. /// + /// Also, notify `ConcurrentState::ready_for_concurrent_call_waker` if + /// present. + /// /// See `GuestCall::is_ready` for details. fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> { for (thread, kind) in @@ -2000,6 +2036,14 @@ impl StoreOpaque { } } + if let Some(waker) = self + .concurrent_state_mut()? + .ready_for_concurrent_call_waker + .take() + { + waker.wake(); + } + Ok(()) } @@ -5260,6 +5304,12 @@ pub struct ConcurrentState { /// /// Used in the implementation of `Accessor::poll_no_interesting_tasks`. interesting_tasks_empty_waker: Option, + + /// Single waker to notify when a component instance goes from + /// not-concurrently-callable to concurrently-callable. + /// + /// Used in the implementation of `Accessor::poll_ready_for_concurrent_call`. + ready_for_concurrent_call_waker: Option, } impl Default for ConcurrentState { @@ -5276,6 +5326,7 @@ impl Default for ConcurrentState { global_error_context_ref_counts: BTreeMap::new(), interesting_tasks: 0, interesting_tasks_empty_waker: None, + ready_for_concurrent_call_waker: None, } } } @@ -5374,6 +5425,7 @@ impl ConcurrentState { global_error_context_ref_counts: _, interesting_tasks: _, interesting_tasks_empty_waker: _, + ready_for_concurrent_call_waker: _, } = self; for entry in table.get_mut().iter_mut() {