Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 116 additions & 8 deletions crates/misc/component-async-tests/tests/scenario/backpressure.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,121 @@
use wasmtime::Result;
use component_async_tests::Ctx;
use std::{
env, future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use wasmtime::{
Engine, Result, Store,
component::{Linker, ResourceTable},
};
use wasmtime_wasi::WasiCtxBuilder;

use super::util::test_run;
use super::util::{config, make_component, test_run};

// No-op function; we only test this by composing it in `async_backpressure_caller`
#[allow(
dead_code,
reason = "here only to make the `assert_test_exists` macro happy"
)]
pub fn async_backpressure_callee() {}
mod callee {
wasmtime::component::bindgen!({
path: "wit",
world: "backpressure-callee",
exports: { default: async | store },
});
}

#[tokio::test]
pub async fn async_backpressure_callee() -> Result<()> {
let mut config = config();
// As of this writing, miri/pulley/epochs is a problematic combination, so
// we don't test it.
if env::var_os("MIRI_TEST_CWASM_DIR").is_none() {
config.epoch_interruption(true);
}

let engine = Engine::new(&config)?;
let component = make_component(
&engine,
&[test_programs_artifacts::ASYNC_BACKPRESSURE_CALLEE_COMPONENT],
)
.await?;
let mut linker = Linker::new(&engine);
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;

let mut store = Store::new(
&engine,
Ctx {
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
table: ResourceTable::default(),
continue_: false,
},
);

if env::var_os("MIRI_TEST_CWASM_DIR").is_none() {
store.set_epoch_deadline(1);

std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(10));
engine.increment_epoch();
});
}

let guest =
callee::BackpressureCallee::instantiate_async(&mut store, &component, &linker).await?;

store
.run_concurrent(async |accessor| {
guest
.local_local_backpressure()
.call_inc_then_later_dec_backpressure(accessor)
.await?;

let func = *guest.local_local_run().func_run().func();

let mut a = Some(Box::pin(guest.local_local_run().call_run(accessor)));
let mut b = Some(Box::pin(guest.local_local_run().call_run(accessor)));
let mut c = Some(Box::pin(guest.local_local_run().call_run(accessor)));

let mut backpressure_is_set = true;
future::poll_fn(move |cx| {
let instance_ready = accessor.poll_ready_for_concurrent_call(func, cx).is_ready();
let a_ready = is_ready(cx, &mut a);
let b_ready = is_ready(cx, &mut b);
let c_ready = is_ready(cx, &mut c);

if backpressure_is_set {
assert!(!instance_ready);
assert!(!a_ready);
assert!(!b_ready);
assert!(!c_ready);

backpressure_is_set = false;

Poll::Pending
} else if instance_ready && a_ready && b_ready && c_ready {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;

wasmtime::error::Ok(())
})
.await??;

Ok(())
}

fn is_ready(cx: &mut Context, fut: &mut Option<Pin<Box<impl Future>>>) -> bool {
if let Some(v) = fut.as_mut() {
if v.as_mut().poll(cx).is_ready() {
*fut = None;
true
} else {
false
}
} else {
true
}
}

#[tokio::test]
pub async fn async_backpressure_caller() -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions crates/misc/component-async-tests/wit/test.wit
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ interface backpressure {
set-backpressure: func(enabled: bool);
inc-backpressure: func();
dec-backpressure: func();
inc-then-later-dec-backpressure: async func();
}

interface transmit {
Expand Down
10 changes: 10 additions & 0 deletions crates/test-programs/src/bin/async_backpressure_callee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ impl Backpressure for Component {
fn dec_backpressure() {
wit_bindgen::backpressure_dec();
}
async fn inc_then_later_dec_backpressure() {
wit_bindgen::backpressure_inc();

wit_bindgen::spawn_local(async {
for _ in 0..10 {
wit_bindgen::yield_async().await;
}
wit_bindgen::backpressure_dec();
});
}
}

// Unused function; required since this file is built as a `bin`:
Expand Down
10 changes: 10 additions & 0 deletions crates/test-programs/src/bin/async_cancel_callee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ unsafe extern "C" fn export_dec_backpressure() {
wit_bindgen::backpressure_dec();
}

#[unsafe(export_name = "[async-lift]local:local/backpressure#inc-then-later-dec-backpressure")]
unsafe extern "C" fn export_inc_then_later_dec_backpressure() -> u32 {
todo!()
}

#[unsafe(export_name = "[callback][async-lift]local:local/backpressure#inc-then-later-dec-backpressure")]
unsafe extern "C" fn callback_inc_then_later_dec_backpressure(_: u32, _: u32, _: u32) -> u32 {
todo!()
}

#[unsafe(export_name = "local:local/yield#yield-times")]
unsafe extern "C" fn export_yield_yield_times(times: u64) {
unsafe {
Expand Down
11 changes: 10 additions & 1 deletion crates/wasi-http/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,12 @@ where
Some((pair, queue))
}
));
let func = match proxy {
#[cfg(feature = "p3")]
Proxy::P3(guest) => *guest.wasi_http_handler().func_handle().func(),
#[cfg(feature = "p2")]
Proxy::P2(guest) => *guest.wasi_http_incoming_handler().func_handle().func(),
};
future::poll_fn(|cx| {
loop {
// First, and crucially first, poll `futures`. This way
Expand Down Expand Up @@ -839,6 +845,8 @@ where
Poll::Ready(None) | Poll::Pending => {}
}

let is_ready = accessor.poll_ready_for_concurrent_call(func, cx).is_ready();

// At this point `futures` is either empty or it's `Pending`
// meaning nothing is ready. Note that `Pending` here
// doesn't necessarily mean all tasks are blocked on I/O.
Expand All @@ -850,6 +858,7 @@ where
// at all or all our tasks really are blocked on I/O.
self.set_available(
may_accept
&& is_ready
&& match dropper
.state
.should_accept_request(futures.len(), reuse_count)
Expand Down Expand Up @@ -905,7 +914,7 @@ where
// if we're not actually capable of accepting any more work,
// then we're completely done and it's time to exit this
// worker.
if !may_accept {
if !(may_accept && is_ready) {
break Poll::Ready(Ok(()));
}

Expand Down
56 changes: 54 additions & 2 deletions crates/wasmtime/src/runtime/component/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,8 @@ where
/// otherwise returns `Poll::Pending`. If pending is returned then whenever
/// the last remaining "interesting" task has exited the provided context's
/// waker will be notified. Note that only the waker passed to the last call
/// to `poll_no_interesting_tasks` will be notified, so this is only
/// appropriate to use one-at-a-time.
/// to `poll_no_interesting_tasks` for the store will be notified, so this
/// is only appropriate to use once-at-a-time per store.
///
/// The component model specification, as of this current date, does not
/// have a distinction between "interesting" tasks and not. The current
Expand Down Expand Up @@ -579,6 +579,39 @@ where
}
})
}

/// Poll to see if the component instance corresponding to the specified
/// function is ready to run a concurrent call without queuing it (i.e. does
/// not have backpressure enabled and does not have a sync call in
/// progress).
///
/// Returns `Poll::Ready(())` if the component instance is ready to run a
/// concurrent call, and otherwise returns `Poll::Pending`. If pending is
/// returned then whenever the instance becomes ready for a call the
/// provided context's waker will be notified. Note that only the waker
/// passed to the last call to `poll_ready_for_concurrent_call` for the
/// store will be notified (regardless of whether the same or different
/// `Func` is specified relative to earlier calls), so this is only
/// appropriate to use once-at-a-time per store. Also note that the waker
/// may be notified when _any_ instance becomes callable (i.e. not
/// necessarily the last one polled), so this function must be called again
/// to determine if the instance of interest is ready.
pub fn poll_ready_for_concurrent_call(&self, func: Func, cx: &mut Context<'_>) -> Poll<()> {
self.with(|mut access| {
let store = access.as_context_mut().0;
let (_, _, _, raw_options) = func.abi_info(store);
let instance = func.instance().runtime_instance(raw_options.instance);
let state = store.instance_state(instance).concurrent_state();
if state.backpressure == 0 {
Poll::Ready(())
} else {
store
.concurrent_state_mut_without_forcing_current_thread()
.ready_for_concurrent_call_waker = Some(cx.waker().clone());
Poll::Pending
}
})
}
}

/// Represents a task which may be provided to `Accessor::spawn`,
Expand Down Expand Up @@ -1983,6 +2016,9 @@ impl StoreOpaque {
/// Iterate over `InstanceState::pending`, moving any ready items into the
/// "high priority" work item queue.
///
/// Also, notify `ConcurrentState::ready_for_concurrent_call_waker` if
/// present.
///
/// See `GuestCall::is_ready` for details.
fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
for (thread, kind) in
Expand All @@ -2000,6 +2036,14 @@ impl StoreOpaque {
}
}

if let Some(waker) = self
.concurrent_state_mut()?
.ready_for_concurrent_call_waker
.take()
{
waker.wake();
}

Ok(())
}

Expand Down Expand Up @@ -5260,6 +5304,12 @@ pub struct ConcurrentState {
///
/// Used in the implementation of `Accessor::poll_no_interesting_tasks`.
interesting_tasks_empty_waker: Option<Waker>,

/// Single waker to notify when a component instance goes from
/// not-concurrently-callable to concurrently-callable.
///
/// Used in the implementation of `Accessor::poll_ready_for_concurrent_call`.
ready_for_concurrent_call_waker: Option<Waker>,
}

impl Default for ConcurrentState {
Expand All @@ -5276,6 +5326,7 @@ impl Default for ConcurrentState {
global_error_context_ref_counts: BTreeMap::new(),
interesting_tasks: 0,
interesting_tasks_empty_waker: None,
ready_for_concurrent_call_waker: None,
}
}
}
Expand Down Expand Up @@ -5374,6 +5425,7 @@ impl ConcurrentState {
global_error_context_ref_counts: _,
interesting_tasks: _,
interesting_tasks_empty_waker: _,
ready_for_concurrent_call_waker: _,
} = self;

for entry in table.get_mut().iter_mut() {
Expand Down
Loading