diff --git a/Cargo.lock b/Cargo.lock index 2f28372ae09..575195e537f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2978,9 +2978,9 @@ dependencies = [ [[package]] name = "diesel" -version = "2.3.2" +version = "2.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8496eeb328dce26ee9d9b73275d396d9bddb433fa30106cf6056dd8c3c2764c" +checksum = "9940fb8467a0a06312218ed384185cb8536aa10d8ec017d0ce7fad2c1bd882d5" dependencies = [ "bigdecimal 0.3.1", "bitflags 2.11.1", @@ -2999,16 +2999,15 @@ dependencies = [ [[package]] name = "diesel-async" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b95864e58597509106f1fddfe0600de7e589e1fddddd87f54eee0a49fd111bbc" +checksum = "9c20ddcc6737cecdaef3dfecb2796bdfe3002456521189d30be8e4c5a1bc821d" dependencies = [ "deadpool 0.13.0", "diesel", "futures-core", "futures-util", "pin-project-lite", - "scoped-futures", "tokio", "tokio-postgres", ] @@ -7300,15 +7299,6 @@ dependencies = [ "parking_lot", ] -[[package]] -name = "scoped-futures" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091" -dependencies = [ - "pin-project-lite", -] - [[package]] name = "scopeguard" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 46e7a059ef0..2e418a4f98e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ diesel = { version = "2.2.7", features = [ "chrono", "i-implement-a-third-party-backend-and-opt-into-breaking-changes", ] } -diesel-async = { version = "0.8.0", features = ["deadpool", "async-connection-wrapper", "tokio", "postgres"] } +diesel-async = { version = "0.9.0", features = ["deadpool", "async-connection-wrapper", "tokio", "postgres"] } diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } diesel-dynamic-schema = { version = "0.2.3", features = ["postgres"] } diesel_derives = "2.3.8" diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 2f344bdeb7b..69185d3d8d5 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -25,7 +25,6 @@ use graph_chain_ethereum::chain::BlockFinality; use graph_store_postgres::BlockStore; use graph_store_postgres::ChainStore; use graph_store_postgres::PoolCoordinator; -use graph_store_postgres::ScopedFutureExt; use graph_store_postgres::Shard; use graph_store_postgres::add_chain; use graph_store_postgres::find_chain; @@ -260,34 +259,35 @@ pub async fn change_block_cache_shard( let new_name = format!("{}-old", &chain_name); let ident = chain_store.chain_identifier().await?; - conn.transaction::<(), StoreError, _>(|conn| { - async { - let shard = Shard::new(shard.to_string())?; + conn.transaction::<(), StoreError, _>(async |conn| { + let shard = Shard::new(shard.to_string())?; - let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?; + let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?; - store.add_chain_store(&chain, true).await?; + store.add_chain_store(&chain, true).await?; - // Drop the foreign key constraint on deployment_schemas - sql_query( - "alter table deployment_schemas drop constraint deployment_schemas_network_fkey;", - ) - .execute(conn).await?; + // Drop the foreign key constraint on deployment_schemas + sql_query( + "alter table deployment_schemas drop constraint deployment_schemas_network_fkey;", + ) + .execute(conn) + .await?; - // Update the current chain name to chain-old - update_chain_name(conn, &chain_name, &new_name).await?; + // Update the current chain name to chain-old + update_chain_name(conn, &chain_name, &new_name).await?; - // Create a new chain with the name in the destination shard - let _ = add_chain(conn, &chain_name, &shard, ident).await?; + // Create a new chain with the name in the destination shard + let _ = add_chain(conn, &chain_name, &shard, ident).await?; - // Re-add the foreign key constraint - sql_query( - "alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);", - ) - .execute(conn).await?; - Ok(()) - }.scope_boxed() - }).await?; + // Re-add the foreign key constraint + sql_query( + "alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);", + ) + .execute(conn) + .await?; + Ok(()) + }) + .await?; chain_store.update_name(&new_name).await?; diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 0ae4b05597a..683553d7c22 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -5,7 +5,7 @@ use graph::{components::store::BLOCK_CACHE_SIZE, parking_lot::RwLock}; use anyhow::anyhow; use async_trait::async_trait; use diesel::{ExpressionMethods as _, QueryDsl, sql_query}; -use diesel_async::{RunQueryDsl, scoped_futures::ScopedFutureExt}; +use diesel_async::RunQueryDsl; use graph::{ blockchain::ChainIdentifier, components::store::{BlockStore as BlockStoreTrait, QueryPermit}, @@ -253,9 +253,7 @@ impl BlockStore { const CHAIN_HEAD_CACHE_TTL: Duration = Duration::from_secs(2); let mirror = PrimaryMirror::new(&pools); - let existing_chains = mirror - .read_async(|conn| primary::load_chains(conn).scope_boxed()) - .await?; + let existing_chains = mirror.read_async(primary::load_chains).await?; let chain_head_cache = TimedCache::new(CHAIN_HEAD_CACHE_TTL); let chains = shards.clone(); @@ -437,18 +435,15 @@ impl BlockStore { let chain = chain.to_string(); let this = self.cheap_clone(); self.mirror - .read_async(|conn| { - async { - match primary::find_chain(conn, &chain).await? { - Some(chain) => { - let chain_store = this.add_chain_store(&chain, false).await?; - Ok(Some(chain_store)) - } - None => Ok(None), + .read_async( + async |conn| match primary::find_chain(conn, &chain).await? { + Some(chain) => { + let chain_store = this.add_chain_store(&chain, false).await?; + Ok(Some(chain_store)) } - } - .scope_boxed() - }) + None => Ok(None), + }, + ) .await } diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 3fe39462a98..3160afebfab 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -2,8 +2,7 @@ use anyhow::anyhow; use async_trait::async_trait; use diesel::sql_types::Text; use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, insert_into, update}; -use diesel_async::AsyncConnection; -use diesel_async::{RunQueryDsl, scoped_futures::ScopedFutureExt}; +use diesel_async::RunQueryDsl; use graph::components::store::ChainHeadStore; use graph::data::store::ethereum::call; @@ -2420,24 +2419,21 @@ impl ChainStore { use public::ethereum_networks::dsl::*; let mut conn = self.pool.get_permitted().await?; - conn.transaction(|conn| { - async move { - insert_into(ethereum_networks) - .values(( - name.eq(&self.chain), - namespace.eq(&self.storage), - head_block_hash.eq::>(None), - head_block_number.eq::>(None), - net_version.eq(&ident.net_version), - genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()), - )) - .on_conflict(name) - .do_nothing() - .execute(conn) - .await?; - self.storage.create(conn).await - } - .scope_boxed() + conn.transaction(async move |conn| { + insert_into(ethereum_networks) + .values(( + name.eq(&self.chain), + namespace.eq(&self.storage), + head_block_hash.eq::>(None), + head_block_number.eq::>(None), + net_version.eq(&ident.net_version), + genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()), + )) + .on_conflict(name) + .do_nothing() + .execute(conn) + .await?; + self.storage.create(conn).await }) .await?; @@ -2447,15 +2443,12 @@ impl ChainStore { pub async fn update_name(&self, name: &str) -> Result<(), Error> { use public::ethereum_networks as n; let mut conn = self.pool.get_permitted().await?; - conn.transaction(|conn| { - async { - update(n::table.filter(n::name.eq(&self.chain))) - .set(n::name.eq(name)) - .execute(conn) - .await?; - Ok(()) - } - .scope_boxed() + conn.transaction(async |conn| { + update(n::table.filter(n::name.eq(&self.chain))) + .set(n::name.eq(name)) + .execute(conn) + .await?; + Ok(()) }) .await } @@ -2465,16 +2458,12 @@ impl ChainStore { use public::ethereum_networks as n; let mut conn = self.pool.get_permitted().await?; - conn.transaction(|conn| { - async { - self.storage.drop_storage(conn, &self.chain).await?; - - delete(n::table.filter(n::name.eq(&self.chain))) - .execute(conn) - .await?; - Ok(()) - } - .scope_boxed() + conn.transaction(async |conn| { + self.storage.drop_storage(conn, &self.chain).await?; + delete(n::table.filter(n::name.eq(&self.chain))) + .execute(conn) + .await?; + Ok(()) }) .await } @@ -2694,8 +2683,8 @@ impl ChainStore { let hash = ptr.hash_hex(); let number = ptr.number as i64; - conn.transaction::<(Option, Option<(String, i64)>), StoreError, _>(|conn| { - async move { + conn.transaction::<(Option, Option<(String, i64)>), StoreError, _>( + async move |conn| { update(n::table.filter(n::name.eq(&self.chain))) .set(( n::head_block_hash.eq(&hash), @@ -2704,9 +2693,8 @@ impl ChainStore { .execute(conn) .await?; Ok((None, Some((hash, number)))) - } - .scope_boxed() - }) + }, + ) .await } @@ -2823,24 +2811,21 @@ impl ChainHeadStore for ChainStore { self.chain_head_update_sender.send(&hash, number).await?; let mut conn = self.pool.get_permitted().await?; - conn.transaction(|conn| { - async { - self.storage - .upsert_block(conn, &self.chain, block.as_ref(), true) - .await?; + conn.transaction(async |conn| { + self.storage + .upsert_block(conn, &self.chain, block.as_ref(), true) + .await?; - update(n::table.filter(n::name.eq(&self.chain))) - .set(( - n::head_block_hash.eq(&hash), - n::head_block_number.eq(number), - n::head_block_cursor.eq(cursor), - )) - .execute(conn) - .await?; + update(n::table.filter(n::name.eq(&self.chain))) + .set(( + n::head_block_hash.eq(&hash), + n::head_block_number.eq(number), + n::head_block_cursor.eq(cursor), + )) + .execute(conn) + .await?; - Ok::<(), StoreError>(()) - } - .scope_boxed() + Ok::<(), StoreError>(()) }) .await?; Ok(()) @@ -2866,10 +2851,10 @@ impl ChainStoreTrait for ChainStore { } let mut conn = self.pool.get_permitted().await?; - conn.transaction(|conn| { + conn.transaction(async |conn| { self.storage .upsert_block(conn, &self.chain, block.as_ref(), true) - .scope_boxed() + .await }) .await .map_err(Error::from) @@ -3658,22 +3643,19 @@ impl EthereumCallCache for ChainStore { let id = contract_call_id(req, &block); let conn = &mut self.pool.get_permitted().await?; let return_value = conn - .transaction::<_, Error, _>(|conn| { - async { - if let Some((return_value, update_accessed_at)) = - self.storage.get_call_and_access(conn, id.as_ref()).await? - { - if update_accessed_at { - self.storage - .update_accessed_at(conn, req.address.as_ref()) - .await?; - } - Ok(Some(return_value)) - } else { - Ok(None) + .transaction::<_, Error, _>(async |conn| { + if let Some((return_value, update_accessed_at)) = + self.storage.get_call_and_access(conn, id.as_ref()).await? + { + if update_accessed_at { + self.storage + .update_accessed_at(conn, req.address.as_ref()) + .await?; } + Ok(Some(return_value)) + } else { + Ok(None) } - .scope_boxed() }) .await?; Ok(return_value.map(|return_value| { @@ -3699,10 +3681,8 @@ impl EthereumCallCache for ChainStore { let conn = &mut self.pool.get_permitted().await?; let rows = conn - .transaction::<_, Error, _>(|conn| { - self.storage - .get_calls_and_access(conn, &id_refs) - .scope_boxed() + .transaction::<_, Error, _>(async |conn| { + self.storage.get_calls_and_access(conn, &id_refs).await }) .await?; @@ -3732,8 +3712,8 @@ impl EthereumCallCache for ChainStore { async fn get_calls_in_block(&self, block: BlockPtr) -> Result, Error> { let conn = &mut self.pool.get_permitted().await?; - conn.transaction::<_, Error, _>(|conn| { - self.storage.get_calls_in_block(conn, block).scope_boxed() + conn.transaction::<_, Error, _>(async |conn| { + self.storage.get_calls_in_block(conn, block).await }) .await } @@ -3765,7 +3745,7 @@ impl EthereumCallCache for ChainStore { let id = contract_call_id(&call, &block); let conn = &mut self.pool.get_permitted().await?; - conn.transaction::<_, anyhow::Error, _>(|conn| { + conn.transaction::<_, anyhow::Error, _>(async |conn| { self.storage .set_call( conn, @@ -3774,7 +3754,7 @@ impl EthereumCallCache for ChainStore { block.number, &return_value, ) - .scope_boxed() + .await }) .await?; Ok(()) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 6a5372464ff..b258093869d 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -26,25 +26,19 @@ use std::{ use diesel::{ ExpressionMethods, OptionalExtension, QueryDsl, dsl::sql, insert_into, select, update, }; -use diesel_async::{ - AsyncConnection, - scoped_futures::{ScopedBoxFuture, ScopedFutureExt}, -}; -use diesel_async::{RunQueryDsl, SimpleAsyncConnection}; +use diesel_async::{AsyncConnection, RunQueryDsl, SimpleAsyncConnection}; use crate::{ AsyncPgConnection, ConnectionPool, advisory_lock, catalog, deployment, dynds::DataSourcesTable, + pool::TxnCallback, primary::{DeploymentId, Primary, Site}, relational::{Layout, Table, index::IndexList}, relational_queries as rq, vid_batcher::{VidBatcher, VidRange}, }; use graph::{ - futures03::{ - FutureExt as _, - future::{BoxFuture, select_all}, - }, + futures03::{FutureExt as _, future::select_all}, internal_error, prelude::{ BlockNumber, BlockPtr, CheapClone, ENV_VARS, Logger, StoreError, info, lazy_static, o, warn, @@ -820,14 +814,11 @@ impl CopyTableWorker { } match conn - .transaction(|conn| { - async { - if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() { - conn.batch_execute(timeout).await?; - } - self.table.copy_batch(conn).await + .transaction(async |conn| { + if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() { + conn.batch_execute(timeout).await?; } - .scope_boxed() + self.table.copy_batch(conn).await }) .await { @@ -866,7 +857,7 @@ impl CopyTableWorker { // that is hard to predict. This mechanism ensures // that if our estimation is wrong, the consequences // aren't too severe. - conn.transaction(|conn| self.table.set_batch_size(conn, 1).scope_boxed()) + conn.transaction(async |conn| self.table.set_batch_size(conn, 1).await) .await?; } }; @@ -1033,18 +1024,14 @@ impl Connection { fn transaction<'a, 'conn, R, F>( &'conn mut self, callback: F, - ) -> Result>, StoreError> + ) -> Result> + Send + 'conn, StoreError> where - F: for<'r> FnOnce( - &'r mut AsyncPgConnection, - ) -> ScopedBoxFuture<'a, 'r, Result> - + Send - + 'a, + F: TxnCallback + 'a, R: Send + 'a, 'a: 'conn, { let conn = self.get_conn()?; - Ok(conn.transaction(|conn| callback(conn).scope_boxed())) + Ok(conn.transaction(callback)) } /// Copy private data sources if the source uses a schema version that @@ -1054,19 +1041,19 @@ impl Connection { let src_manifest_idx_and_name = self.src_manifest_idx_and_name.cheap_clone(); let dst_manifest_idx_and_name = self.dst_manifest_idx_and_name.cheap_clone(); if state.src.site.schema_version.private_data_sources() { - self.transaction(|conn| { - async { - DataSourcesTable::new(state.src.site.namespace.clone()) - .copy_to( - conn, - &DataSourcesTable::new(state.dst.site.namespace.clone()), - state.target_block.number, - &src_manifest_idx_and_name, - &dst_manifest_idx_and_name, - ) - .await - } - .scope_boxed() + let src_ns = state.src.site.namespace.clone(); + let dst_ns = state.dst.site.namespace.clone(); + let target_block = state.target_block.number; + self.transaction(async move |conn| { + DataSourcesTable::new(src_ns) + .copy_to( + conn, + &DataSourcesTable::new(dst_ns), + target_block, + &src_manifest_idx_and_name, + &dst_manifest_idx_and_name, + ) + .await })? .await?; } @@ -1165,8 +1152,8 @@ impl Connection { let target_block = self.target_block.clone(); let primary = self.primary.cheap_clone(); let mut state = self - .transaction(|conn| { - CopyState::new(conn, primary, src, dst, target_block).scope_boxed() + .transaction(async move |conn| { + CopyState::new(conn, primary, src, dst, target_block).await })? .await?; @@ -1284,7 +1271,7 @@ impl Connection { self.copy_private_data_sources(&state).await?; - self.transaction(|conn| state.finished(conn).scope_boxed())? + self.transaction(async |conn| state.finished(conn).await)? .await?; progress.finished(); diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 2ba59381c0a..5e6fdcf4747 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1,6 +1,5 @@ use detail::DeploymentDetail; use diesel::sql_query; -use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::{AsyncConnection as _, RunQueryDsl, SimpleAsyncConnection}; use tokio::task::JoinHandle; @@ -185,53 +184,49 @@ impl DeploymentStore { on_sync: OnSync, ) -> Result<(), StoreError> { let mut conn = self.pool.get_permitted().await?; - conn.transaction::<_, StoreError, _>(|conn| { - async { - let exists = deployment::exists(conn, &site).await?; - - // Create (or update) the metadata. Update only happens in tests - let entities_with_causality_region = - deployment.manifest.entities_with_causality_region.clone(); - - // If `GRAPH_HISTORY_BLOCKS_OVERRIDE` is set, override the history_blocks - // setting with the value of the environment variable. - let deployment = if let Some(history_blocks_global_override) = - ENV_VARS.history_blocks_override - { + conn.transaction::<_, StoreError, _>(async |conn| { + let exists = deployment::exists(conn, &site).await?; + + // Create (or update) the metadata. Update only happens in tests + let entities_with_causality_region = + deployment.manifest.entities_with_causality_region.clone(); + + // If `GRAPH_HISTORY_BLOCKS_OVERRIDE` is set, override the history_blocks + // setting with the value of the environment variable. + let deployment = + if let Some(history_blocks_global_override) = ENV_VARS.history_blocks_override { deployment.with_history_blocks_override(history_blocks_global_override) } else { deployment }; - if replace || !exists { - deployment::create_deployment(conn, &site, deployment, exists, replace).await?; - }; - - // Create the schema for the subgraph data - if !exists { - let query = format!("create schema {}", &site.namespace); - conn.batch_execute(&query).await?; + if replace || !exists { + deployment::create_deployment(conn, &site, deployment, exists, replace).await?; + }; - let _ = Layout::create_relational_schema( - conn, - site.clone(), - schema, - entities_with_causality_region.into_iter().collect(), - ) - .await?; + // Create the schema for the subgraph data + if !exists { + let query = format!("create schema {}", &site.namespace); + conn.batch_execute(&query).await?; + + let _ = Layout::create_relational_schema( + conn, + site.clone(), + schema, + entities_with_causality_region.into_iter().collect(), + ) + .await?; - // Create data sources table - if site.schema_version.private_data_sources() { - conn.batch_execute(&DataSourcesTable::new(site.namespace.clone()).as_ddl()) - .await?; - } + // Create data sources table + if site.schema_version.private_data_sources() { + conn.batch_execute(&DataSourcesTable::new(site.namespace.clone()).as_ddl()) + .await?; } + } - deployment::set_on_sync(conn, &site, on_sync).await?; + deployment::set_on_sync(conn, &site, on_sync).await?; - Ok(()) - } - .scope_boxed() + Ok(()) }) .await } @@ -253,15 +248,12 @@ impl DeploymentStore { // is not reversible pub(crate) async fn drop_deployment(&self, site: &Site) -> Result<(), StoreError> { let mut conn = self.pool.get_permitted().await?; - conn.transaction(|conn| { - async { - crate::deployment::drop_schema(conn, &site.namespace).await?; - if !site.schema_version.private_data_sources() { - crate::dynds::shared::drop(conn, &site.deployment).await?; - } - crate::deployment::drop_metadata(conn, site).await + conn.transaction(async |conn| { + crate::deployment::drop_schema(conn, &site.namespace).await?; + if !site.schema_version.private_data_sources() { + crate::dynds::shared::drop(conn, &site.deployment).await?; } - .scope_boxed() + crate::deployment::drop_metadata(conn, site).await }) .await } @@ -293,16 +285,13 @@ impl DeploymentStore { let query = diesel::sql_query(query); let results = conn - .transaction(|conn| { - async { - if let Some(ref timeout_sql) = *STATEMENT_TIMEOUT { - conn.batch_execute(timeout_sql).await?; - } - - // Execute the provided SQL query - query.load::(conn).await + .transaction(async |conn| { + if let Some(ref timeout_sql) = *STATEMENT_TIMEOUT { + conn.batch_execute(timeout_sql).await?; } - .scope_boxed() + + // Execute the provided SQL query + query.load::(conn).await }) .await .map_err(|e| QueryExecutionError::SqlError(e.to_string()))?; @@ -538,7 +527,7 @@ impl DeploymentStore { block_ptr: BlockPtr, ) -> Result<(), StoreError> { let mut conn = self.pool.get_permitted().await?; - conn.transaction(|conn| deployment::set_synced(conn, id, block_ptr).scope_boxed()) + conn.transaction(async |conn| deployment::set_synced(conn, id, block_ptr).await) .await } @@ -646,17 +635,14 @@ impl DeploymentStore { .map(|table| vec![table]) .unwrap_or_else(|| layout.tables.values().map(Arc::as_ref).collect()); - conn.transaction(|conn| { - async { - for table in tables { - let (columns, _) = resolve_column_names_and_index_exprs(table, &columns)?; + conn.transaction(async |conn| { + for table in tables { + let (columns, _) = resolve_column_names_and_index_exprs(table, &columns)?; - catalog::set_stats_target(conn, &site.namespace, &table.name, &columns, target) - .await?; - } - Ok(()) + catalog::set_stats_target(conn, &site.namespace, &table.name, &columns, target) + .await?; } - .scope_boxed() + Ok(()) }) .await } @@ -855,9 +841,8 @@ impl DeploymentStore { return Ok(reporter); } - conn.transaction(|conn| { - deployment::set_earliest_block(conn, site.as_ref(), req.earliest_block) - .scope_boxed() + conn.transaction(async |conn| { + deployment::set_earliest_block(conn, site.as_ref(), req.earliest_block).await }) .await?; @@ -909,21 +894,18 @@ impl DeploymentStore { conn.build_transaction() .repeatable_read() .read_only() - .run(|conn| { - async move { - let entity_count = crate::detail::entity_count(conn, &site).await?; - layout - .dump( - conn, - index_list, - dir, - &site.network, - entity_count, - &mut *reporter, - ) - .await - } - .scope_boxed() + .run(async move |conn| { + let entity_count = crate::detail::entity_count(conn, &site).await?; + layout + .dump( + conn, + index_list, + dir, + &site.network, + entity_count, + &mut *reporter, + ) + .await }) .await } @@ -1166,68 +1148,65 @@ impl DeploymentStore { }; let (layout, earliest_block) = deployment::with_lock(&mut conn, &site, async |conn| { - conn.transaction(|conn| { - async { - // Make the changes - let layout = self.layout(conn, site.clone()).await?; - - let section = stopwatch.start_section("apply_entity_modifications"); - let count = self - .apply_entity_modifications( - conn, - logger, - layout.as_ref(), - batch.groups(), - stopwatch, - ) - .await?; - section.end(); + conn.transaction(async |conn| { + // Make the changes + let layout = self.layout(conn, site.clone()).await?; + + let section = stopwatch.start_section("apply_entity_modifications"); + let count = self + .apply_entity_modifications( + conn, + logger, + layout.as_ref(), + batch.groups(), + stopwatch, + ) + .await?; + section.end(); - layout.rollup(conn, last_rollup, &batch.block_times).await?; + layout.rollup(conn, last_rollup, &batch.block_times).await?; - dynds::insert(conn, &site, &batch.data_sources, manifest_idx_and_name).await?; + dynds::insert(conn, &site, &batch.data_sources, manifest_idx_and_name).await?; - dynds::update_offchain_status(conn, &site, &batch.offchain_to_remove).await?; + dynds::update_offchain_status(conn, &site, &batch.offchain_to_remove).await?; - if !batch.deterministic_errors.is_empty() { - deployment::insert_subgraph_errors( - &self.logger, + if !batch.deterministic_errors.is_empty() { + deployment::insert_subgraph_errors( + &self.logger, + conn, + &site.deployment, + &batch.deterministic_errors, + batch.block_ptr.number, + ) + .await?; + + if batch.is_non_fatal_errors_active { + debug!( + logger, + "Updating non-fatal errors for subgraph"; + "subgraph" => site.deployment.to_string(), + "block" => batch.block_ptr.number, + ); + deployment::update_non_fatal_errors( conn, &site.deployment, - &batch.deterministic_errors, - batch.block_ptr.number, + deployment::SubgraphHealth::Unhealthy, + Some(&batch.deterministic_errors), ) .await?; - - if batch.is_non_fatal_errors_active { - debug!( - logger, - "Updating non-fatal errors for subgraph"; - "subgraph" => site.deployment.to_string(), - "block" => batch.block_ptr.number, - ); - deployment::update_non_fatal_errors( - conn, - &site.deployment, - deployment::SubgraphHealth::Unhealthy, - Some(&batch.deterministic_errors), - ) - .await?; - } } + } - let earliest_block = deployment::transact_block( - conn, - &site, - &batch.block_ptr, - &batch.firehose_cursor, - count, - ) - .await?; + let earliest_block = deployment::transact_block( + conn, + &site, + &batch.block_ptr, + &batch.firehose_cursor, + count, + ) + .await?; - Ok((layout, earliest_block)) - } - .scope_boxed() + Ok((layout, earliest_block)) }) .await }) @@ -1357,35 +1336,31 @@ impl DeploymentStore { ) -> Result<(), StoreError> { let logger = self.logger.cheap_clone(); deployment::with_lock(conn, &site, async |conn| { - conn.transaction(|conn| { - async { - // The revert functions want the number of the first block that we need to get rid of - let block = block_ptr_to.number + 1; + conn.transaction(async |conn| { + // The revert functions want the number of the first block that we need to get rid of + let block = block_ptr_to.number + 1; - deployment::revert_block_ptr(conn, &site, block_ptr_to, firehose_cursor) - .await?; + deployment::revert_block_ptr(conn, &site, block_ptr_to, firehose_cursor).await?; - // Revert the data - let layout = self.layout(conn, site.clone()).await?; + // Revert the data + let layout = self.layout(conn, site.clone()).await?; - if truncate { - layout.truncate_tables(conn).await?; - deployment::clear_entity_count(conn, site.as_ref()).await?; - } else { - let count = layout.revert_block(conn, block).await?; - deployment::update_entity_count(conn, site.as_ref(), count).await?; - } + if truncate { + layout.truncate_tables(conn).await?; + deployment::clear_entity_count(conn, site.as_ref()).await?; + } else { + let count = layout.revert_block(conn, block).await?; + deployment::update_entity_count(conn, site.as_ref(), count).await?; + } - // Revert the meta data changes that correspond to this subgraph. - // Only certain meta data changes need to be reverted, most - // importantly creation of dynamic data sources. We ensure in the - // rest of the code that we only record history for those meta data - // changes that might need to be reverted - Layout::revert_metadata(&logger, conn, &site, block).await?; + // Revert the meta data changes that correspond to this subgraph. + // Only certain meta data changes need to be reverted, most + // importantly creation of dynamic data sources. We ensure in the + // rest of the code that we only record history for those meta data + // changes that might need to be reverted + Layout::revert_metadata(&logger, conn, &site, block).await?; - Ok(()) - } - .scope_boxed() + Ok(()) }) .await }) @@ -1509,7 +1484,7 @@ impl DeploymentStore { error: SubgraphError, ) -> Result<(), StoreError> { let mut conn = self.pool.get_permitted().await?; - conn.transaction(|conn| deployment::fail(conn, &id, &error).scope_boxed()) + conn.transaction(async |conn| deployment::fail(conn, &id, &error).await) .await } @@ -1615,85 +1590,81 @@ impl DeploymentStore { } let mut conn = self.pool.get_permitted().await?; - conn.transaction::<(), StoreError, _>(|conn| { - async { - // Copy shared dynamic data sources and adjust their ID; if - // the subgraph uses private data sources, that is done by - // `copy::Connection::copy_data` since it requires access to - // the source schema which in sharded setups is only - // available while that function runs - let start = Instant::now(); - let count = - dynds::shared::copy(conn, &src.site, &dst.site, block.number).await?; - info!(logger, "Copied {} dynamic data sources", count; - "time_ms" => start.elapsed().as_millis()); - - // Copy errors across - let start = Instant::now(); - let count = deployment::copy_errors(conn, &src.site, &dst.site, &block).await?; - info!(logger, "Copied {} existing errors", count; - "time_ms" => start.elapsed().as_millis()); - - catalog::copy_account_like(conn, &src.site, &dst.site).await?; - - // Analyze all tables for this deployment - info!(logger, "Analyzing all {} tables", dst.tables.len()); - for entity_name in dst.tables.keys() { - self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn) - .await?; - } + conn.transaction::<(), StoreError, _>(async |conn| { + // Copy shared dynamic data sources and adjust their ID; if + // the subgraph uses private data sources, that is done by + // `copy::Connection::copy_data` since it requires access to + // the source schema which in sharded setups is only + // available while that function runs + let start = Instant::now(); + let count = dynds::shared::copy(conn, &src.site, &dst.site, block.number).await?; + info!(logger, "Copied {} dynamic data sources", count; + "time_ms" => start.elapsed().as_millis()); + + // Copy errors across + let start = Instant::now(); + let count = deployment::copy_errors(conn, &src.site, &dst.site, &block).await?; + info!(logger, "Copied {} existing errors", count; + "time_ms" => start.elapsed().as_millis()); + + catalog::copy_account_like(conn, &src.site, &dst.site).await?; + + // Analyze all tables for this deployment + info!(logger, "Analyzing all {} tables", dst.tables.len()); + for entity_name in dst.tables.keys() { + self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn) + .await?; + } - // CopyEntityBatchQuery now reverts entity versions - // during copying, making this rewind redundant for new - // copies. We keep it for backward compatibility: a copy - // that was started before this change and is resumed - // after upgrading will have already-copied rows that - // weren't reverted during copy. For data that was - // already reverted during copy, this is a no-op. This - // code can be removed once a release with this change - // has been out for a while and we are sure that there - // are no more copies in progress that started before - // the change - // - // `revert_block` gets rid of everything including the - // block passed to it. We want to preserve `block` and - // therefore revert `block+1` - let start = Instant::now(); - let block_to_revert: BlockNumber = block - .number - .checked_add(1) - .expect("block numbers fit into an i32"); - info!(logger, "Rewinding to block {}", block.number); - let count = dst.revert_block(conn, block_to_revert).await?; - deployment::update_entity_count(conn, &dst.site, count).await?; - - info!(logger, "Rewound subgraph to block {}", block.number; - "time_ms" => start.elapsed().as_millis()); - - deployment::set_history_blocks( - conn, - &dst.site, - src_deployment.manifest.history_blocks, - ) - .await?; + // CopyEntityBatchQuery now reverts entity versions + // during copying, making this rewind redundant for new + // copies. We keep it for backward compatibility: a copy + // that was started before this change and is resumed + // after upgrading will have already-copied rows that + // weren't reverted during copy. For data that was + // already reverted during copy, this is a no-op. This + // code can be removed once a release with this change + // has been out for a while and we are sure that there + // are no more copies in progress that started before + // the change + // + // `revert_block` gets rid of everything including the + // block passed to it. We want to preserve `block` and + // therefore revert `block+1` + let start = Instant::now(); + let block_to_revert: BlockNumber = block + .number + .checked_add(1) + .expect("block numbers fit into an i32"); + info!(logger, "Rewinding to block {}", block.number); + let count = dst.revert_block(conn, block_to_revert).await?; + deployment::update_entity_count(conn, &dst.site, count).await?; + + info!(logger, "Rewound subgraph to block {}", block.number; + "time_ms" => start.elapsed().as_millis()); + + deployment::set_history_blocks( + conn, + &dst.site, + src_deployment.manifest.history_blocks, + ) + .await?; - // The `earliest_block` for `src` might have changed while - // we did the copy if `src` was pruned while we copied; - // adjusting it very late in the copy process ensures that - // we truly do have all the data starting at - // `earliest_block` and do not inadvertently expose data - // that might be incomplete because a prune on the source - // removed data just before we copied it - deployment::copy_earliest_block(conn, &src.site, &dst.site).await?; - - // Set the block ptr to the graft point to signal that we successfully - // performed the graft - crate::deployment::forward_block_ptr(conn, &dst.site, &block).await?; - info!(logger, "Subgraph successfully initialized"; - "time_ms" => start.elapsed().as_millis()); - Ok(()) - } - .scope_boxed() + // The `earliest_block` for `src` might have changed while + // we did the copy if `src` was pruned while we copied; + // adjusting it very late in the copy process ensures that + // we truly do have all the data starting at + // `earliest_block` and do not inadvertently expose data + // that might be incomplete because a prune on the source + // removed data just before we copied it + deployment::copy_earliest_block(conn, &src.site, &dst.site).await?; + + // Set the block ptr to the graft point to signal that we successfully + // performed the graft + crate::deployment::forward_block_ptr(conn, &dst.site, &block).await?; + info!(logger, "Subgraph successfully initialized"; + "time_ms" => start.elapsed().as_millis()); + Ok(()) }) .await?; } @@ -1707,8 +1678,8 @@ impl DeploymentStore { // deployed subgraphs so that we respect the 'startBlock' setting // the first time the subgraph is started let mut conn = self.pool.get_permitted().await?; - conn.transaction(|conn| { - crate::deployment::initialize_block_ptr(conn, &dst.site).scope_boxed() + conn.transaction(async |conn| { + crate::deployment::initialize_block_ptr(conn, &dst.site).await }) .await?; Ok(()) @@ -1754,8 +1725,7 @@ impl DeploymentStore { let mut conn = self.pool.get_permitted().await?; let deployment_id = &site.deployment; - conn.transaction(|conn| { - async { + conn.transaction(async |conn| { // We'll only unfail subgraphs that had fatal errors let subgraph_error = match ErrorDetail::fatal(conn, deployment_id).await? { Some(fatal_error) => fatal_error, @@ -1829,7 +1799,7 @@ impl DeploymentStore { Ok(UnfailOutcome::Noop) } - } }.scope_boxed() + } }).await } @@ -1851,7 +1821,7 @@ impl DeploymentStore { let mut conn = self.pool.get_permitted().await?; let deployment_id = &site.deployment; - conn.transaction(|conn| async { + conn.transaction(async |conn| { // We'll only unfail subgraphs that had fatal errors let subgraph_error = match ErrorDetail::fatal(conn, deployment_id).await? { Some(fatal_error) => fatal_error, @@ -1907,7 +1877,7 @@ impl DeploymentStore { Ok(UnfailOutcome::BehindErrorBlock) } } - }.scope_boxed()).await + }).await } #[cfg(debug_assertions)] diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index c1d1c2f6708..32b6c495b2e 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -62,9 +62,7 @@ pub use self::chain_store::{ChainStore, ChainStoreMetrics, Storage}; pub use self::detail::DeploymentDetail; pub use self::jobs::register as register_jobs; pub use self::notification_listener::NotificationSender; -pub use self::pool::{ - AsyncPgConnection, ConnectionPool, ForeignServer, PoolCoordinator, PoolRole, ScopedFutureExt, -}; +pub use self::pool::{AsyncPgConnection, ConnectionPool, ForeignServer, PoolCoordinator, PoolRole}; pub use self::primary::{RestoreMode, UnusedDeployment, db_version}; pub use self::store::Store; pub use self::store_events::SubscriptionManager; diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index e71740f9d6a..c479c2263a2 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -19,9 +19,9 @@ use graph::slog::warn; use graph::util::timed_rw_lock::TimedMutex; use tokio::sync::OwnedSemaphorePermit; -use diesel_async::scoped_futures::ScopedBoxFuture; use std::collections::HashMap; use std::fmt::{self}; +use std::future::Future; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::time::Duration; @@ -31,12 +31,66 @@ use crate::pool::manager::{ConnectionManager, WaitMeter}; use crate::primary::{self, Mirror, Namespace}; use crate::{PRIMARY_SHARD, Shard}; +/// A helper trait that names the future type returned by an async closure. +/// This is the local equivalent of `diesel_async::transaction_manager::AsyncFunc`, +/// which is not a public API. Adding the `Fut: Send` bound on this trait in +/// function signatures allows the compiler to prove that the futures produced +/// by async closures are `Send`, preventing "Send not general enough" errors +/// that arise when using plain `for<'r> F: AsyncFnOnce(...)` bounds. +pub trait AsyncFunc: + AsyncFnOnce(T) -> R + FnOnce(T) -> >::Fut +{ + type Fut: Future; +} + +impl AsyncFunc for F +where + F: AsyncFnOnce(T) -> R + FnOnce(T) -> Fut, + Fut: Future, +{ + type Fut = Fut; +} + +pub trait AsyncMutFunc: + AsyncFnMut(T) -> R + FnMut(T) -> >::Fut +{ + type Fut: Future; +} + +impl AsyncMutFunc for F +where + F: AsyncFnMut(T) -> R + FnMut(T) -> Fut, + Fut: Future, +{ + type Fut = Fut; +} + +/// Shorthand for an async closure that may be used as a transaction +/// callback in our own `fn transaction` implementations in a few places. +pub(crate) trait TxnCallback: + for<'r> AsyncFunc<&'r mut C, Result, Fut: Send> + Send +{ +} + +impl TxnCallback for F where + F: for<'r> AsyncFunc<&'r mut C, Result, Fut: Send> + Send +{ +} + +pub(crate) trait TxnMutCallback: + for<'r> AsyncMutFunc<&'r mut C, Result, Fut: Send> + Send +{ +} + +impl TxnMutCallback for F where + F: for<'r> AsyncMutFunc<&'r mut C, Result, Fut: Send> + Send +{ +} + mod coordinator; mod foreign_server; mod manager; -pub use diesel_async::scoped_futures::ScopedFutureExt; - pub use coordinator::PoolCoordinator; pub use foreign_server::ForeignServer; use manager::StateTracker; @@ -71,12 +125,27 @@ impl PermittedConnection { /// /// This is analogous to `diesel_async::pg::TransactionBuilder` but /// works with the pool-wrapped connection type. The closure receives - /// `&mut PermittedConnection`, keeping the full wrapper type available - /// so callers can pass it to functions that expect `&mut AsyncPgConnection` - /// (the pool alias, not the raw diesel type). - pub fn build_transaction(&mut self) -> TransactionBuilder<'_> { + /// `&mut AsyncPgConnection`, the inner connection, so it can be passed + /// directly to functions that take `&mut AsyncPgConnection`. + pub(crate) fn build_transaction(&mut self) -> TransactionBuilder<'_> { TransactionBuilder::new(self) } + + /// Run `callback` inside a transaction on this connection. Commits on + /// `Ok`, rolls back on `Err`. The callback receives `&mut PermittedConnection`; + /// pass it to functions needing `&mut AsyncPgConnection` via deref coercion. + pub(crate) fn transaction<'a, 'conn, R, E, F>( + &'conn mut self, + callback: F, + ) -> impl Future> + Send + 'conn + where + F: TxnCallback + 'a, + E: From + Send + 'a, + R: Send + 'a, + 'a: 'conn, + { + self.conn.transaction(callback) + } } /// Builder for a PostgreSQL transaction with configurable isolation level @@ -91,7 +160,7 @@ impl PermittedConnection { /// `PoolTransactionManager>`. /// Neither matches `AnsiTransactionManager`. #[must_use = "Transaction builder does nothing unless you call `run` on it"] -pub struct TransactionBuilder<'a> { +pub(crate) struct TransactionBuilder<'a> { conn: &'a mut PermittedConnection, isolation_level: Option<&'static str>, read_only: bool, @@ -107,35 +176,24 @@ impl<'a> TransactionBuilder<'a> { } /// Set the transaction isolation level to `REPEATABLE READ`. - pub fn repeatable_read(mut self) -> Self { + pub(crate) fn repeatable_read(mut self) -> Self { self.isolation_level = Some("REPEATABLE READ"); self } - /// Set the transaction isolation level to `SERIALIZABLE`. - pub fn serializable(mut self) -> Self { - self.isolation_level = Some("SERIALIZABLE"); - self - } - /// Make the transaction `READ ONLY`. - pub fn read_only(mut self) -> Self { + pub(crate) fn read_only(mut self) -> Self { self.read_only = true; self } /// Execute `f` inside the configured transaction. Commits on `Ok`, - /// rolls back on `Err`. - /// - /// The closure must return a `ScopedBoxFuture` (use `.scope_boxed()` - /// from `ScopedFutureExt`). - pub async fn run<'b, T, E, F>(self, f: F) -> Result + /// rolls back on `Err`. The callback receives `&mut PermittedConnection`; + /// pass it to functions needing `&mut AsyncPgConnection` via deref coercion. + pub(crate) async fn run(self, f: F) -> Result where - F: for<'r> FnOnce(&'r mut PermittedConnection) -> ScopedBoxFuture<'b, 'r, Result> - + Send - + 'a, - T: 'b, - E: From + 'b, + F: TxnCallback, + E: From, { let mut sql = String::from("BEGIN TRANSACTION"); if let Some(level) = self.isolation_level { @@ -864,19 +922,16 @@ impl PoolInner { let mut conn = self.get_for_setup().await?; conn.batch_execute("create extension if not exists postgres_fdw") .await?; - conn.transaction(|conn| { - async { - let current_servers: Vec = crate::catalog::current_servers(conn).await?; - for server in servers.iter().filter(|server| server.shard != self.shard) { - if current_servers.contains(&server.name) { - server.update(conn).await?; - } else { - server.create(conn).await?; - } + conn.transaction(async |conn| { + let current_servers: Vec = crate::catalog::current_servers(conn).await?; + for server in servers.iter().filter(|server| server.shard != self.shard) { + if current_servers.contains(&server.name) { + server.update(conn).await?; + } else { + server.create(conn).await?; } - Ok(()) } - .scope_boxed() + Ok(()) }) .await } @@ -922,13 +977,10 @@ impl PoolInner { info!(&self.logger, "Dropping cross-shard views"); let mut conn = self.get_for_setup().await?; - conn.transaction(|conn| { - async { - let query = format!("drop schema if exists {} cascade", CROSS_SHARD_NSP); - conn.batch_execute(&query).await?; - Ok(()) - } - .scope_boxed() + conn.transaction(async move |conn| { + let query = format!("drop schema if exists {} cascade", CROSS_SHARD_NSP); + conn.batch_execute(&query).await?; + Ok(()) }) .await } @@ -967,28 +1019,25 @@ impl PoolInner { } info!(&self.logger, "Creating cross-shard views"); - conn.transaction(|conn| { - async { - let query = format!("create schema {}", CROSS_SHARD_NSP); - conn.batch_execute(&query).await?; - for (src_nsp, src_tables) in SHARDED_TABLES { - // Pairs of (shard, nsp) for all servers - let nsps = shard_nsp_pairs(&self.shard, src_nsp, servers); - for src_table in src_tables { - let create_view = catalog::create_cross_shard_view( - conn, - src_nsp, - src_table, - CROSS_SHARD_NSP, - &nsps, - ) - .await?; - conn.batch_execute(&create_view).await?; - } + conn.transaction(async |conn| { + let query = format!("create schema {}", CROSS_SHARD_NSP); + conn.batch_execute(&query).await?; + for (src_nsp, src_tables) in SHARDED_TABLES { + // Pairs of (shard, nsp) for all servers + let nsps = shard_nsp_pairs(&self.shard, src_nsp, servers); + for src_table in src_tables { + let create_view = catalog::create_cross_shard_view( + conn, + src_nsp, + src_table, + CROSS_SHARD_NSP, + &nsps, + ) + .await?; + conn.batch_execute(&create_view).await?; } - Ok(()) } - .scope_boxed() + Ok(()) }) .await } @@ -1000,7 +1049,7 @@ impl PoolInner { return Ok(()); } let mut conn = self.get().await?; - conn.transaction(|conn| primary::Mirror::refresh_tables(conn).scope_boxed()) + conn.transaction(async |conn| primary::Mirror::refresh_tables(conn).await) .await } @@ -1011,7 +1060,7 @@ impl PoolInner { if server.shard == *PRIMARY_SHARD { info!(&self.logger, "Mapping primary"); let mut conn = self.get_for_setup().await?; - conn.transaction(|conn| ForeignServer::map_primary(conn, &self.shard).scope_boxed()) + conn.transaction(async |conn| ForeignServer::map_primary(conn, &self.shard).await) .await?; } if server.shard != self.shard { @@ -1021,7 +1070,7 @@ impl PoolInner { server.shard.as_str() ); let mut conn = self.get_for_setup().await?; - conn.transaction(|conn| server.map_metadata(conn).scope_boxed()) + conn.transaction(async |conn| server.map_metadata(conn).await) .await?; } Ok(()) diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 2e4e61b9af0..6e6ffd11086 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -5,7 +5,7 @@ use crate::{ AsyncPgConnection, ConnectionPool, ForeignServer, NotificationSender, block_range::UNVERSIONED_RANGE, detail::DeploymentDetail, - pool::{PRIMARY_PUBLIC, PermittedConnection}, + pool::{PRIMARY_PUBLIC, PermittedConnection, TxnCallback, TxnMutCallback}, subgraph_store::{PRIMARY_SHARD, Shard, unused}, }; use diesel::dsl::{delete, insert_into, sql, update}; @@ -21,10 +21,7 @@ use diesel::{ serialize::{Output, ToSql}, sql_types::{Array, BigInt, Bool, Integer, Text}, }; -use diesel_async::{ - RunQueryDsl, SimpleAsyncConnection as _, TransactionManager, - scoped_futures::{ScopedBoxFuture, ScopedFutureExt}, -}; +use diesel_async::{RunQueryDsl, SimpleAsyncConnection as _, TransactionManager}; use graph::{ components::store::DeploymentLocator, data::{ @@ -32,7 +29,6 @@ use graph::{ subgraph::{DeploymentFeatures, status}, }, derive::CheapClone, - futures03::{FutureExt, future::BoxFuture}, internal_error, prelude::{ AssignmentChange, DeploymentHash, NodeId, StoreError, SubgraphName, @@ -816,38 +812,29 @@ impl Connection { /// returns `Err(E)`, the transaction is rolled back and `Err(E)` is /// returned. If committing or rolling back the transaction fails, /// return an error - pub(crate) fn transaction<'a, 'conn, R, F>( - &'conn mut self, - callback: F, - ) -> BoxFuture<'conn, Result> + pub(crate) async fn transaction(&mut self, callback: F) -> Result where - F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result> - + Send - + 'a, - R: Send + 'a, - 'a: 'conn, + F: TxnCallback, + R: Send, { type TM = ::TransactionManager; - async move { - TM::begin_transaction(&mut *self.conn).await?; - match callback(self).await { - Ok(value) => { - TM::commit_transaction(&mut *self.conn).await?; - Ok(value) - } - Err(user_error) => match TM::rollback_transaction(&mut *self.conn).await { - Ok(()) => Err(user_error), - Err(diesel::result::Error::BrokenTransactionManager) => { - // In this case we are probably more interested by the - // original error, which likely caused this - Err(user_error) - } - Err(rollback_error) => Err(rollback_error.into()), - }, + TM::begin_transaction(&mut *self.conn).await?; + match callback(self).await { + Ok(value) => { + TM::commit_transaction(&mut *self.conn).await?; + Ok(value) } + Err(user_error) => match TM::rollback_transaction(&mut *self.conn).await { + Ok(()) => Err(user_error), + Err(diesel::result::Error::BrokenTransactionManager) => { + // In this case we are probably more interested by the + // original error, which likely caused this + Err(user_error) + } + Err(rollback_error) => Err(rollback_error.into()), + }, } - .boxed() } /// Signal any copy process that might be copying into one of these @@ -1539,39 +1526,36 @@ impl Connection { use subgraph_version as v; use unused_deployments as u; - self.transaction(|pconn| { - async { - let conn = &mut pconn.conn; + self.transaction(async |pconn| { + let conn = &mut pconn.conn; - delete(ds::table.filter(ds::id.eq(site.id))) + delete(ds::table.filter(ds::id.eq(site.id))) + .execute(conn) + .await?; + + // If there is no site for this deployment any more, we can get + // rid of versions pointing to it + let exists = select(exists( + ds::table.filter(ds::subgraph.eq(site.deployment.as_str())), + )) + .get_result::(conn) + .await?; + if !exists { + delete(v::table.filter(v::deployment.eq(site.deployment.as_str()))) .execute(conn) .await?; - // If there is no site for this deployment any more, we can get - // rid of versions pointing to it - let exists = select(exists( - ds::table.filter(ds::subgraph.eq(site.deployment.as_str())), - )) - .get_result::(conn) - .await?; - if !exists { - delete(v::table.filter(v::deployment.eq(site.deployment.as_str()))) - .execute(conn) - .await?; - - // Remove the entry in `subgraph_features` - delete(f::table.filter(f::id.eq(site.deployment.as_str()))) - .execute(conn) - .await?; - } - - update(u::table.filter(u::id.eq(site.id))) - .set(u::removed_at.eq(sql("now()"))) + // Remove the entry in `subgraph_features` + delete(f::table.filter(f::id.eq(site.deployment.as_str()))) .execute(conn) .await?; - Ok(()) } - .scope_boxed() + + update(u::table.filter(u::id.eq(site.id))) + .set(u::removed_at.eq(sql("now()"))) + .execute(conn) + .await?; + Ok(()) }) .await } @@ -2164,33 +2148,24 @@ impl Mirror { /// must only access tables that are mirrored through `refresh_tables` /// /// The function `callback` must not do any blocking work itself - pub(crate) fn read_async<'a, 's, R, F>( - &'s self, - callback: F, - ) -> BoxFuture<'s, Result> + pub(crate) async fn read_async(&self, mut callback: F) -> Result where - F: for<'r> Fn(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'a, 'r, Result> - + Send - + 'a, - R: Send + 'a, - 'a: 's, + F: TxnMutCallback + Send, + R: Send, { - async move { - for pool in self.pools.as_ref() { - let mut conn = match pool.get_permitted().await { - Ok(conn) => conn, - Err(StoreError::DatabaseUnavailable) => continue, - Err(e) => return Err(e), - }; - match callback(&mut conn).await { - Ok(v) => return Ok(v), - Err(StoreError::DatabaseUnavailable) => continue, - Err(e) => return Err(e), - } + for pool in self.pools.as_ref() { + let mut conn = match pool.get_permitted().await { + Ok(conn) => conn, + Err(StoreError::DatabaseUnavailable) => continue, + Err(e) => return Err(e), + }; + match callback(&mut conn).await { + Ok(v) => return Ok(v), + Err(StoreError::DatabaseUnavailable) => continue, + Err(e) => return Err(e), } - Err(StoreError::DatabaseUnavailable) } - .boxed() + Err(StoreError::DatabaseUnavailable) } /// Refresh the contents of mirrored tables from the primary (through @@ -2283,17 +2258,17 @@ impl Mirror { } pub async fn assignments(&self, node: &NodeId) -> Result, StoreError> { - self.read_async(|conn| queries::assignments(conn, node).scope_boxed()) + self.read_async(async |conn| queries::assignments(conn, node).await) .await } pub async fn active_assignments(&self, node: &NodeId) -> Result, StoreError> { - self.read_async(|conn| queries::active_assignments(conn, node).scope_boxed()) + self.read_async(async |conn| queries::active_assignments(conn, node).await) .await } pub async fn assigned_node(&self, site: &Site) -> Result, StoreError> { - self.read_async(|conn| queries::assigned_node(conn, site).scope_boxed()) + self.read_async(async |conn| queries::assigned_node(conn, site).await) .await } @@ -2305,7 +2280,7 @@ impl Mirror { &self, site: Arc, ) -> Result, StoreError> { - self.read_async(|conn| queries::assignment_status(conn, &site).scope_boxed()) + self.read_async(async |conn| queries::assignment_status(conn, &site).await) .await } @@ -2313,12 +2288,12 @@ impl Mirror { &self, subgraph: &DeploymentHash, ) -> Result, StoreError> { - self.read_async(|conn| queries::find_active_site(conn, subgraph).scope_boxed()) + self.read_async(async |conn| queries::find_active_site(conn, subgraph).await) .await } pub async fn find_site_by_ref(&self, id: DeploymentId) -> Result, StoreError> { - self.read_async(|conn| queries::find_site_by_ref(conn, id).scope_boxed()) + self.read_async(async |conn| queries::find_site_by_ref(conn, id).await) .await } @@ -2326,17 +2301,17 @@ impl Mirror { &self, name: &SubgraphName, ) -> Result { - self.read_async(|conn| queries::current_deployment_for_subgraph(conn, name).scope_boxed()) + self.read_async(async |conn| queries::current_deployment_for_subgraph(conn, name).await) .await } pub async fn deployments_for_subgraph(&self, name: &str) -> Result, StoreError> { - self.read_async(|conn| queries::deployments_for_subgraph(conn, name).scope_boxed()) + self.read_async(async |conn| queries::deployments_for_subgraph(conn, name).await) .await } pub async fn subgraph_exists(&self, name: &SubgraphName) -> Result { - self.read_async(|conn| queries::subgraph_exists(conn, name).scope_boxed()) + self.read_async(async |conn| queries::subgraph_exists(conn, name).await) .await } @@ -2345,7 +2320,7 @@ impl Mirror { name: &str, use_current: bool, ) -> Result, StoreError> { - self.read_async(|conn| queries::subgraph_version(conn, name, use_current).scope_boxed()) + self.read_async(async |conn| queries::subgraph_version(conn, name, use_current).await) .await } @@ -2356,14 +2331,14 @@ impl Mirror { ids: &[String], only_active: bool, ) -> Result, StoreError> { - self.read_async(|conn| queries::find_sites(conn, ids, only_active).scope_boxed()) + self.read_async(async |conn| queries::find_sites(conn, ids, only_active).await) .await } /// Find sites by their subgraph deployment ids. If `ids` is empty, /// return no sites pub async fn find_sites_by_id(&self, ids: &[DeploymentId]) -> Result, StoreError> { - self.read_async(|conn| queries::find_sites_by_id(conn, ids).scope_boxed()) + self.read_async(async |conn| queries::find_sites_by_id(conn, ids).await) .await } @@ -2371,7 +2346,7 @@ impl Mirror { &self, infos: &[status::Info], ) -> Result, StoreError> { - self.read_async(|conn| queries::fill_assignments(conn, infos).scope_boxed()) + self.read_async(async |conn| queries::fill_assignments(conn, infos).await) .await } @@ -2379,7 +2354,7 @@ impl Mirror { &self, version: &str, ) -> Result, StoreError> { - self.read_async(|conn| queries::version_info(conn, version).scope_boxed()) + self.read_async(async |conn| queries::version_info(conn, version).await) .await } @@ -2387,7 +2362,7 @@ impl Mirror { &self, subgraph_id: &str, ) -> Result<(Option, Option), StoreError> { - self.read_async(|conn| queries::versions_for_subgraph_id(conn, subgraph_id).scope_boxed()) + self.read_async(async |conn| queries::versions_for_subgraph_id(conn, subgraph_id).await) .await } @@ -2396,8 +2371,8 @@ impl Mirror { &self, deployment_hash: &str, ) -> Result, StoreError> { - self.read_async(|conn| { - queries::subgraphs_by_deployment_hash(conn, deployment_hash).scope_boxed() + self.read_async(async |conn| { + queries::subgraphs_by_deployment_hash(conn, deployment_hash).await }) .await } @@ -2407,7 +2382,7 @@ impl Mirror { subgraph: &DeploymentHash, shard: &Shard, ) -> Result, StoreError> { - self.read_async(|conn| queries::find_site_in_shard(conn, subgraph, shard).scope_boxed()) + self.read_async(async |conn| queries::find_site_in_shard(conn, subgraph, shard).await) .await } } diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index eca8a3d4a16..dcec0578a67 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -27,7 +27,6 @@ use diesel::pg::Pg; use diesel::serialize::{Output, ToSql}; use diesel::sql_types::Text; use diesel::{OptionalExtension, QueryDsl, QueryResult, debug_query, sql_query}; -use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::{AsyncConnection, RunQueryDsl, SimpleAsyncConnection}; use graph::blockchain::BlockTime; @@ -906,14 +905,11 @@ impl Layout { let start = Instant::now(); let values = conn - .transaction(|conn| { - async { - if let Some(ref timeout_sql) = *STATEMENT_TIMEOUT { - conn.batch_execute(timeout_sql).await?; - } - query.load::(conn).await + .transaction(async |conn| { + if let Some(ref timeout_sql) = *STATEMENT_TIMEOUT { + conn.batch_execute(timeout_sql).await?; } - .scope_boxed() + query.load::(conn).await }) .await .map_err(|e| { diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 0fef840d8b0..2bfb7e6c9c9 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -1,13 +1,10 @@ -use std::{fmt::Write, sync::Arc}; +use std::{fmt::Write, future::Future, pin::Pin, sync::Arc}; use diesel::{ sql_query, sql_types::{BigInt, Integer}, }; -use diesel_async::{ - AsyncConnection, RunQueryDsl, SimpleAsyncConnection, - scoped_futures::{ScopedBoxFuture, ScopedFutureExt}, -}; +use diesel_async::{AsyncConnection, RunQueryDsl, SimpleAsyncConnection}; use graph::{ components::store::{PrunePhase, PruneReporter, PruneRequest, PruningStrategy, VersionStats}, prelude::{BLOCK_NUMBER_MAX, BlockNumber, CancelableError, CheapClone, StoreError}, @@ -91,10 +88,14 @@ impl TablePair { let mut batcher = VidBatcher::load(conn, &self.src_nsp, &self.src, range).await?; tracker.start_copy_final(conn, &self.src, range).await?; + let src_name = self.src.qualified_name.clone(); + let dst_name = self.dst.qualified_name.clone(); while !batcher.finished() { let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| { let column_list = column_list.cheap_clone(); - async move { + let src_name = src_name.clone(); + let dst_name = dst_name.clone(); + Box::pin(async move { // Page through all rows in `src` in batches of `batch_size` // and copy the ones that are visible to queries at block // heights between `earliest_block` and `final_block`, but @@ -103,7 +104,7 @@ impl TablePair { // The conditions on `block_range` are expressed redundantly // to make more indexes useable sql_query(format!( - "/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \ + "/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ where lower(block_range) <= $2 \ @@ -112,10 +113,10 @@ impl TablePair { and block_range && int4range($1, $2, '[]') \ and vid >= $3 and vid <= $4 \ order by vid", - src = self.src.qualified_name, - dst = self.dst.qualified_name, - batch_size = end - start + 1, - )) + src = src_name, + dst = dst_name, + batch_size = end - start + 1, + )) .bind::(earliest_block) .bind::(final_block) .bind::(start) @@ -123,8 +124,7 @@ impl TablePair { .execute(conn) .await .map_err(StoreError::from) - } - .scope_boxed() + }) }) .await?; let rows = rows.unwrap_or(0); @@ -159,16 +159,20 @@ impl TablePair { let mut batcher = VidBatcher::load(conn, &self.src.nsp, &self.src, range).await?; tracker.start_copy_nonfinal(conn, &self.src, range).await?; + let src_name = self.src.qualified_name.clone(); + let dst_name = self.dst.qualified_name.clone(); while !batcher.finished() { let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| { let column_list = column_list.cheap_clone(); - async move { - // Page through all the rows in `src` in batches of - // `batch_size` that are visible to queries at block heights - // starting right after `final_block`. The conditions on - // `block_range` are expressed redundantly to make more - // indexes useable - sql_query(format!( + let src_name = src_name.clone(); + let dst_name = dst_name.clone(); + Box::pin(async move { + // Page through all the rows in `src` in batches of + // `batch_size` that are visible to queries at block heights + // starting right after `final_block`. The conditions on + // `block_range` are expressed redundantly to make more + // indexes useable + sql_query(format!( "/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ @@ -176,8 +180,8 @@ impl TablePair { and block_range && int4range($1, null) \ and vid >= $2 and vid <= $3 \ order by vid", - dst = self.dst.qualified_name, - src = self.src.qualified_name, + dst = dst_name, + src = src_name, batch_size = end - start + 1, )) .bind::(final_block) @@ -186,7 +190,8 @@ impl TablePair { .execute(conn) .await .map_err(StoreError::from) - }.scope_boxed()}).await?; + }) + }).await?; let rows = rows.unwrap_or(0); tracker @@ -233,8 +238,10 @@ impl TablePair { writeln!(query, "drop table {src_qname};")?; writeln!(query, "alter table {dst_qname} set schema {src_nsp}")?; - conn.transaction(|conn| conn.batch_execute(&query).scope_boxed()) - .await?; + conn.transaction(async move |conn| { + conn.batch_execute(&query).await.map_err(StoreError::from) + }) + .await?; Ok(()) } @@ -462,24 +469,27 @@ impl Layout { VidBatcher::load(conn, &self.site.namespace, table, range).await?; tracker.start_delete(conn, table, range, &batcher).await?; + let qname = table.qualified_name.clone(); + let earliest_block = req.earliest_block; while !batcher.finished() { let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| { - async move { - sql_query(format!( - "/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \ - delete from {qname} \ - where coalesce(upper(block_range), 2147483647) <= $1 \ - and vid >= $2 and vid <= $3", - qname = table.qualified_name, - batch_size = end - start + 1 - )) - .bind::(req.earliest_block) - .bind::(start) - .bind::(end) - .execute(conn) - .await - .map_err(StoreError::from) - }.scope_boxed()}).await?; + let qname = qname.clone(); + Box::pin(async move { + sql_query(format!( + "/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \ + delete from {qname} \ + where coalesce(upper(block_range), 2147483647) <= $1 \ + and vid >= $2 and vid <= $3", + batch_size = end - start + 1 + )) + .bind::(earliest_block) + .bind::(start) + .bind::(end) + .execute(conn) + .await + .map_err(StoreError::from) + }) + }).await?; let rows = rows.unwrap_or(0); tracker @@ -512,6 +522,14 @@ impl Layout { } } +/// The type of per-batch query closure expected by `batch_with_timeout`. +/// Callers wrap their async body in `Box::pin(async move { ... })` so that +/// the future is type-erased and explicitly `Send`. This sidesteps the +/// limitation that async closures don't implement `Fn`/`FnMut`, which +/// would prevent us from calling the closure again on the timeout-retry +/// path. +type BatchQuery<'r, R> = Pin> + Send + 'r>>; + /// Perform a step with the `batcher`. If that step takes longer than /// `BATCH_STATEMENT_TIMEOUT`, kill the query and reset the batch size of /// the batcher to 1 and perform a step with that size which we assume takes @@ -519,49 +537,48 @@ impl Layout { /// /// Doing this serves as a safeguard against very bad batch size estimations /// so that batches never take longer than `BATCH_SIZE_TIMEOUT` -async fn batch_with_timeout<'a, 'conn, R, F>( - conn: &'conn mut AsyncPgConnection, +async fn batch_with_timeout( + conn: &mut AsyncPgConnection, batcher: &mut VidBatcher, - query: F, + mut query: F, ) -> Result, StoreError> where R: Send, - F: for<'r> Fn( - &'r mut AsyncPgConnection, - i64, - i64, - ) -> ScopedBoxFuture<'a, 'r, Result> - + Sync, - 'a: 'conn, + F: for<'r> FnMut(&'r mut AsyncPgConnection, i64, i64) -> BatchQuery<'r, R> + Send, { - let res = batcher - .step(async |start, end| { - conn.transaction(|conn| { - async { - if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() { + async fn run_attempt( + conn: &mut AsyncPgConnection, + batcher: &mut VidBatcher, + query: &mut F, + with_timeout: bool, + ) -> Result, StoreError> + where + R: Send, + F: for<'r> FnMut(&'r mut AsyncPgConnection, i64, i64) -> BatchQuery<'r, R> + Send, + { + batcher + .step(async |start, end| { + conn.transaction(async |conn| { + if with_timeout && let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() { conn.batch_execute(timeout).await?; } query(conn, start, end).await - } - .scope_boxed() + }) + .await }) .await - }) - .await - .map(|(_, res)| res); + .map(|(_, res)| res) + } - if !matches!(res, Err(StoreError::StatementTimeout)) { + let res = run_attempt(conn, batcher, &mut query, true).await; + let Err(StoreError::StatementTimeout) = res else { return res; - } + }; + // Timeout fired. Shrink to one row and retry without the timeout — at + // this size we expect the query to complete. batcher.set_batch_size(1); - batcher - .step(async |start, end| { - conn.transaction(|conn| query(conn, start, end).scope_boxed()) - .await - }) - .await - .map(|(_, res)| res) + run_attempt(conn, batcher, &mut query, false).await } mod status { @@ -578,8 +595,7 @@ mod status { sql_types::Text, table, update, }; - use diesel_async::RunQueryDsl as _; - use diesel_async::{AsyncConnection, scoped_futures::ScopedFutureExt}; + use diesel_async::{AsyncConnection, RunQueryDsl as _}; use graph::{ components::store::{PruneRequest, PruningStrategy, StoreResult}, env::ENV_VARS, @@ -782,40 +798,50 @@ mod status { use prune_state as ps; use prune_table_state as pts; - conn.transaction(|conn| { - async move { - insert_into(ps::table) + let site_id = self.layout.site.id; + let run = self.run; + let first_block = req.first_block; + let final_block = req.final_block; + let latest_block = req.latest_block; + let history_blocks = req.history_blocks; + let table_entries: Vec<(String, &'static str)> = prunable_tables + .iter() + .map(|(table, strat)| { + let strat = match strat { + PruningStrategy::Rebuild => "r", + PruningStrategy::Delete => "d", + }; + (table.name.to_string(), strat) + }) + .collect(); + + conn.transaction(async move |conn| { + insert_into(ps::table) + .values(( + ps::id.eq(site_id), + ps::run.eq(run), + ps::first_block.eq(first_block), + ps::final_block.eq(final_block), + ps::latest_block.eq(latest_block), + ps::history_blocks.eq(history_blocks), + ps::started_at.eq(diesel::dsl::now), + )) + .execute(conn) + .await?; + + for (table_name, strat) in &table_entries { + insert_into(pts::table) .values(( - ps::id.eq(self.layout.site.id), - ps::run.eq(self.run), - ps::first_block.eq(req.first_block), - ps::final_block.eq(req.final_block), - ps::latest_block.eq(req.latest_block), - ps::history_blocks.eq(req.history_blocks), - ps::started_at.eq(diesel::dsl::now), + pts::id.eq(site_id), + pts::run.eq(run), + pts::table_name.eq(table_name.as_str()), + pts::strategy.eq(*strat), + pts::phase.eq(Phase::Queued), )) .execute(conn) .await?; - - for (table, strat) in prunable_tables { - let strat = match strat { - PruningStrategy::Rebuild => "r", - PruningStrategy::Delete => "d", - }; - insert_into(pts::table) - .values(( - pts::id.eq(self.layout.site.id), - pts::run.eq(self.run), - pts::table_name.eq(table.name.as_str()), - pts::strategy.eq(strat), - pts::phase.eq(Phase::Queued), - )) - .execute(conn) - .await?; - } - Ok(()) } - .scope_boxed() + Ok(()) }) .await } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index a04852b4d3d..8c268562aa7 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -5,7 +5,6 @@ use diesel::{ serialize::{Output, ToSql}, sql_types::{self, Text}, }; -use diesel_async::scoped_futures::ScopedFutureExt; use std::fmt; use std::{ collections::{BTreeMap, HashMap}, @@ -423,26 +422,23 @@ impl SubgraphStore { // FIXME: This simultaneously holds a `primary_conn` and a shard connection, which can // potentially deadlock. let mut pconn = self.primary_conn().await?; + let subgraph_store = self.cheap_clone(); + let site_clone = site.cheap_clone(); pconn - .transaction(|pconn| { - let subgraph_store = self.cheap_clone(); - let site = site.cheap_clone(); - async move { - let exists_and_synced = async move |id: &DeploymentHash| { - let (store, _) = subgraph_store.store(id).await?; - store.deployment_exists_and_synced(id).await - }; - - // Create subgraph, subgraph version, and assignment - let changes = pconn - .create_subgraph_version(name, &site, node_id, mode, exists_and_synced) - .await?; + .transaction(async move |pconn| { + let exists_and_synced = async move |id: &DeploymentHash| { + let (store, _) = subgraph_store.store(id).await?; + store.deployment_exists_and_synced(id).await + }; - let event = StoreEvent::new(changes); - pconn.send_store_event(&self.sender, &event).await?; - Ok(()) - } - .scope_boxed() + // Create subgraph, subgraph version, and assignment + let changes = pconn + .create_subgraph_version(name, &site_clone, node_id, mode, exists_and_synced) + .await?; + + let event = StoreEvent::new(changes); + pconn.send_store_event(&self.sender, &event).await?; + Ok(()) }) .await?; Ok(site.as_ref().into()) @@ -915,18 +911,17 @@ impl Inner { .await?; let mut pconn = self.primary_conn().await?; + let sender = self.sender.clone(); + let dst_clone = dst.cheap_clone(); pconn - .transaction(|pconn| { - async { - // Create subgraph, subgraph version, and assignment. We use the - // existence of an assignment as a signal that we already set up - // the copy - let changes = pconn.assign_subgraph(dst.as_ref(), &node).await?; - let event = StoreEvent::new(changes); - pconn.send_store_event(&self.sender, &event).await?; - Ok(()) - } - .scope_boxed() + .transaction(async move |pconn| { + // Create subgraph, subgraph version, and assignment. We use the + // existence of an assignment as a signal that we already set up + // the copy + let changes = pconn.assign_subgraph(dst_clone.as_ref(), &node).await?; + let event = StoreEvent::new(changes); + pconn.send_store_event(&sender, &event).await?; + Ok(()) }) .await?; Ok(dst.as_ref().into()) @@ -1649,7 +1644,7 @@ impl SubgraphStoreTrait for SubgraphStore { async fn create_subgraph(&self, name: SubgraphName) -> Result { let mut pconn = self.primary_conn().await?; pconn - .transaction(|pconn| pconn.create_subgraph(&name).scope_boxed()) + .transaction(async move |pconn| pconn.create_subgraph(&name).await) .await } @@ -1659,7 +1654,7 @@ impl SubgraphStoreTrait for SubgraphStore { ) -> Result<(), StoreError> { let mut pconn = self.primary_conn().await?; pconn - .transaction(|pconn| pconn.create_subgraph_features(features).scope_boxed()) + .transaction(async move |pconn| pconn.create_subgraph_features(features).await) .await } @@ -1672,15 +1667,13 @@ impl SubgraphStoreTrait for SubgraphStore { .collect::>(); let mut pconn = self.primary_conn().await?; + let sender = self.sender.clone(); pconn - .transaction(|pconn| { - async { - let changes = pconn.remove_subgraph(name).await?; - pconn - .send_store_event(&self.sender, &StoreEvent::new(changes)) - .await - } - .scope_boxed() + .transaction(async move |pconn| { + let changes = pconn.remove_subgraph(name).await?; + pconn + .send_store_event(&sender, &StoreEvent::new(changes)) + .await }) .await?; Ok(deployments) @@ -1693,15 +1686,13 @@ impl SubgraphStoreTrait for SubgraphStore { ) -> Result<(), StoreError> { let site = self.find_site(deployment.id.into()).await?; let mut pconn = self.primary_conn().await?; + let sender = self.sender.clone(); pconn - .transaction(|pconn| { - async { - let changes = pconn.reassign_subgraph(site.as_ref(), node_id).await?; - pconn - .send_store_event(&self.sender, &StoreEvent::new(changes)) - .await - } - .scope_boxed() + .transaction(async move |pconn| { + let changes = pconn.reassign_subgraph(site.as_ref(), node_id).await?; + pconn + .send_store_event(&sender, &StoreEvent::new(changes)) + .await }) .await } @@ -1709,15 +1700,13 @@ impl SubgraphStoreTrait for SubgraphStore { async fn unassign_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError> { let site = self.find_site(deployment.id.into()).await?; let mut pconn = self.primary_conn().await?; + let sender = self.sender.clone(); pconn - .transaction(|pconn| { - async { - let changes = pconn.unassign_subgraph(site.as_ref()).await?; - pconn - .send_store_event(&self.sender, &StoreEvent::new(changes)) - .await - } - .scope_boxed() + .transaction(async move |pconn| { + let changes = pconn.unassign_subgraph(site.as_ref()).await?; + pconn + .send_store_event(&sender, &StoreEvent::new(changes)) + .await }) .await } @@ -1725,15 +1714,13 @@ impl SubgraphStoreTrait for SubgraphStore { async fn pause_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError> { let site = self.find_site(deployment.id.into()).await?; let mut pconn = self.primary_conn().await?; + let sender = self.sender.clone(); pconn - .transaction(|pconn| { - async { - let changes = pconn.pause_subgraph(site.as_ref()).await?; - pconn - .send_store_event(&self.sender, &StoreEvent::new(changes)) - .await - } - .scope_boxed() + .transaction(async move |pconn| { + let changes = pconn.pause_subgraph(site.as_ref()).await?; + pconn + .send_store_event(&sender, &StoreEvent::new(changes)) + .await }) .await } @@ -1741,15 +1728,13 @@ impl SubgraphStoreTrait for SubgraphStore { async fn resume_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError> { let site = self.find_site(deployment.id.into()).await?; let mut pconn = self.primary_conn().await?; + let sender = self.sender.clone(); pconn - .transaction(|pconn| { - async { - let changes = pconn.resume_subgraph(site.as_ref()).await?; - pconn - .send_store_event(&self.sender, &StoreEvent::new(changes)) - .await - } - .scope_boxed() + .transaction(async move |pconn| { + let changes = pconn.resume_subgraph(site.as_ref()).await?; + pconn + .send_store_event(&sender, &StoreEvent::new(changes)) + .await }) .await } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 38732c15c58..6c6e0f320b4 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -8,7 +8,6 @@ use std::time::Instant; use std::{collections::BTreeMap, sync::Arc}; use async_trait::async_trait; -use diesel_async::scoped_futures::ScopedFutureExt; use graph::blockchain::BlockTime; use graph::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor}; use graph::components::store::{Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore}; @@ -374,14 +373,11 @@ impl SyncStore { let mut pconn = self.store.primary_conn().await?; let sender = self.store.notification_sender(); pconn - .transaction(|pconn| { - async { - let changes = pconn.unassign_subgraph(site).await?; - pconn - .send_store_event(&sender, &StoreEvent::new(changes)) - .await - } - .scope_boxed() + .transaction(async move |pconn| { + let changes = pconn.unassign_subgraph(site).await?; + pconn + .send_store_event(&sender, &StoreEvent::new(changes)) + .await }) .await }) @@ -393,14 +389,11 @@ impl SyncStore { let mut pconn = self.store.primary_conn().await?; let sender = self.store.notification_sender(); pconn - .transaction(|pconn| { - async { - let changes = pconn.pause_subgraph(site).await?; - pconn - .send_store_event(&sender, &StoreEvent::new(changes)) - .await - } - .scope_boxed() + .transaction(async move |pconn| { + let changes = pconn.pause_subgraph(site).await?; + pconn + .send_store_event(&sender, &StoreEvent::new(changes)) + .await }) .await }) @@ -450,13 +443,11 @@ impl SyncStore { // store so that we do not hold two database connections which // might come from the same pool and could therefore deadlock let mut pconn = self.store.primary_conn().await?; + let deployment = self.site.deployment.clone(); pconn - .transaction(|pconn| { - async { - let changes = pconn.promote_deployment(&self.site.deployment).await?; - Ok(StoreEvent::new(changes)) - } - .scope_boxed() + .transaction(async move |pconn| { + let changes = pconn.promote_deployment(&deployment).await?; + Ok(StoreEvent::new(changes)) }) .await? }; @@ -485,7 +476,7 @@ impl SyncStore { let mut pconn = self.store.primary_conn().await?; let sender = self.store.notification_sender(); pconn - .transaction(|pconn| pconn.send_store_event(&sender, &event).scope_boxed()) + .transaction(async move |pconn| pconn.send_store_event(&sender, &event).await) .await }) .await