From d849e4d24a063be2d2d602d50a258349f8af9342 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Wed, 15 Apr 2026 20:47:10 +0800 Subject: [PATCH 1/2] feat(storage/opendal): route oss:// URLs through the S3 backend --- crates/storage/opendal/src/lib.rs | 9 +++- crates/storage/opendal/src/oss.rs | 14 +++-- crates/storage/opendal/src/resolving.rs | 71 ++++++++++++++++++++++++- 3 files changed, 88 insertions(+), 6 deletions(-) diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 65deaa5f44..283b920034 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -144,7 +144,14 @@ impl StorageFactory for OpenDalStorageFactory { OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs { config: gcs_config_parse(config.props().clone())?.into(), })), - #[cfg(feature = "opendal-oss")] + // OSS is S3-API-compatible; route through S3 so `s3.*` props + // work for OSS-backed tables (mirrors pyiceberg/Java S3FileIO). + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::S3 { + config: s3_config_parse(config.props().clone())?.into(), + customized_credential_load: None, + })), + #[cfg(all(feature = "opendal-oss", not(feature = "opendal-s3")))] OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss { config: oss_config_parse(config.props().clone())?.into(), })), diff --git a/crates/storage/opendal/src/oss.rs b/crates/storage/opendal/src/oss.rs index add8b7a0f7..778c0bd5fb 100644 --- a/crates/storage/opendal/src/oss.rs +++ b/crates/storage/opendal/src/oss.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; - -use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; use iceberg::{Error, ErrorKind, Result}; use opendal::services::OssConfig; use opendal::{Configurator, Operator}; @@ -26,7 +23,16 @@ use url::Url; use crate::utils::from_opendal_error; /// Parse iceberg props to oss config. -pub(crate) fn oss_config_parse(mut m: HashMap) -> Result { +/// +/// Only reachable when the native OSS service is used (i.e. `opendal-s3` +/// is disabled); when `opendal-s3` is enabled, OSS is routed through the +/// S3 code path and this function is dead code. +#[cfg(not(feature = "opendal-s3"))] +pub(crate) fn oss_config_parse( + mut m: std::collections::HashMap, +) -> Result { + use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; + let mut cfg: OssConfig = OssConfig::default(); if let Some(endpoint) = m.remove(OSS_ENDPOINT) { cfg.endpoint = Some(endpoint); diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 621495519a..3da4983773 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -100,7 +100,19 @@ fn build_storage_for_scheme( config: Arc::new(config), }) } - #[cfg(feature = "opendal-oss")] + // OSS is S3-API-compatible; route through S3 so `s3.*` props + // work for OSS-backed tables (mirrors pyiceberg/Java S3FileIO). + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + "oss" => { + let config = crate::s3::s3_config_parse(props.clone())?; + Ok(OpenDalStorage::S3 { + config: Arc::new(config), + customized_credential_load: customized_credential_load.clone(), + }) + } + // Fallback: builds without `opendal-s3` but with `opendal-oss` + // still use the native OSS service (which consumes `oss.*` keys). + #[cfg(all(feature = "opendal-oss", not(feature = "opendal-s3")))] "oss" => { let config = crate::oss::oss_config_parse(props.clone())?; Ok(OpenDalStorage::Oss { @@ -362,4 +374,61 @@ mod tests { "abfss and abfs should share one instance" ); } + + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + #[test] + fn oss_scheme_routes_through_s3_backend_when_available() { + use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT}; + + let mut props = HashMap::new(); + props.insert(S3_ACCESS_KEY_ID.to_string(), "AKIA_TEST".to_string()); + props.insert( + S3_ENDPOINT.to_string(), + "https://oss-cn-shanghai.aliyuncs.com".to_string(), + ); + + let storage = build_storage_for_scheme("oss", &props, &None).unwrap(); + assert!( + matches!(storage, OpenDalStorage::S3 { .. }), + "OSS should route through S3 backend when opendal-s3 is enabled" + ); + } + + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + #[test] + fn oss_resolve_uses_s3_backend_and_caches() { + use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT}; + + let mut props = HashMap::new(); + props.insert(S3_ACCESS_KEY_ID.to_string(), "AKIA_TEST".to_string()); + props.insert( + S3_ENDPOINT.to_string(), + "https://oss-cn-shanghai.aliyuncs.com".to_string(), + ); + + let resolving = OpenDalResolvingStorage { + props, + storages: RwLock::new(HashMap::new()), + #[cfg(feature = "opendal-s3")] + customized_credential_load: None, + }; + + let storage = resolving.resolve("oss://my-bucket/path/to/file").unwrap(); + assert!(matches!(storage.as_ref(), OpenDalStorage::S3 { .. })); + + // Second call should hit the cache. + let cached = resolving.resolve("oss://my-bucket/other").unwrap(); + assert!(Arc::ptr_eq(&storage, &cached)); + } + + #[cfg(all(feature = "opendal-oss", not(feature = "opendal-s3")))] + #[test] + fn oss_scheme_falls_back_to_native_oss_backend() { + let props = HashMap::new(); + let storage = build_storage_for_scheme("oss", &props).unwrap(); + assert!( + matches!(storage, OpenDalStorage::Oss { .. }), + "Without opendal-s3, OSS should use the native OSS backend" + ); + } } From b73b4463eb0b1a8e48201f5accf5c06b162c092a Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 17 May 2026 17:47:26 +0800 Subject: [PATCH 2/2] refactor(storage/opendal): remove OSS native backend, route through S3 only --- crates/storage/opendal/src/lib.rs | 80 +------------------------ crates/storage/opendal/src/oss.rs | 63 ------------------- crates/storage/opendal/src/resolving.rs | 20 ------- 3 files changed, 2 insertions(+), 161 deletions(-) delete mode 100644 crates/storage/opendal/src/oss.rs diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 283b920034..de69a2bd25 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -73,13 +73,8 @@ cfg_if! { } } -cfg_if! { - if #[cfg(feature = "opendal-oss")] { - mod oss; - use opendal::services::OssConfig; - use oss::*; - } -} +// When `opendal-oss` is enabled, `oss://` URLs are routed through the S3 +// backend (OSS is S3-API-compatible). No native OSS module is needed. cfg_if! { if #[cfg(feature = "opendal-s3")] { @@ -151,10 +146,6 @@ impl StorageFactory for OpenDalStorageFactory { config: s3_config_parse(config.props().clone())?.into(), customized_credential_load: None, })), - #[cfg(all(feature = "opendal-oss", not(feature = "opendal-s3")))] - OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss { - config: oss_config_parse(config.props().clone())?.into(), - })), #[cfg(feature = "opendal-azdls")] OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls { config: azdls_config_parse(config.props().clone())?.into(), @@ -208,12 +199,6 @@ pub enum OpenDalStorage { /// GCS configuration. config: Arc, }, - /// OSS storage variant. - #[cfg(feature = "opendal-oss")] - Oss { - /// OSS configuration. - config: Arc, - }, /// Azure Data Lake Storage variant. /// /// Accepts paths of the form @@ -303,19 +288,6 @@ impl OpenDalStorage { )); } } - #[cfg(feature = "opendal-oss")] - OpenDalStorage::Oss { config } => { - let op = oss_config_build(config, path)?; - let prefix = format!("oss://{}/", op.info().name()); - if path.starts_with(&prefix) { - (op, &path[prefix.len()..]) - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {path}, should start with {prefix}"), - )); - } - } #[cfg(feature = "opendal-azdls")] OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?, #[cfg(all( @@ -389,25 +361,6 @@ impl OpenDalStorage { )) } } - #[cfg(feature = "opendal-oss")] - OpenDalStorage::Oss { .. } => { - let url = url::Url::parse(path)?; - let bucket = url.host_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {path}, missing bucket"), - ) - })?; - let prefix = format!("oss://{}/", bucket); - if path.starts_with(&prefix) { - Ok(&path[prefix.len()..]) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {path}, should start with {prefix}"), - )) - } - } #[cfg(feature = "opendal-azdls")] OpenDalStorage::Azdls { config } => { let azure_path = path.parse::()?; @@ -671,35 +624,6 @@ mod tests { ); } - #[cfg(feature = "opendal-oss")] - #[test] - fn test_relativize_path_oss() { - let storage = OpenDalStorage::Oss { - config: Arc::new(OssConfig::default()), - }; - - assert_eq!( - storage - .relativize_path("oss://my-bucket/path/to/file.parquet") - .unwrap(), - "path/to/file.parquet" - ); - } - - #[cfg(feature = "opendal-oss")] - #[test] - fn test_relativize_path_oss_invalid_scheme() { - let storage = OpenDalStorage::Oss { - config: Arc::new(OssConfig::default()), - }; - - assert!( - storage - .relativize_path("s3://my-bucket/path/to/file.parquet") - .is_err() - ); - } - #[cfg(feature = "opendal-azdls")] #[test] fn test_relativize_path_azdls() { diff --git a/crates/storage/opendal/src/oss.rs b/crates/storage/opendal/src/oss.rs deleted file mode 100644 index 778c0bd5fb..0000000000 --- a/crates/storage/opendal/src/oss.rs +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use iceberg::{Error, ErrorKind, Result}; -use opendal::services::OssConfig; -use opendal::{Configurator, Operator}; -use url::Url; - -use crate::utils::from_opendal_error; - -/// Parse iceberg props to oss config. -/// -/// Only reachable when the native OSS service is used (i.e. `opendal-s3` -/// is disabled); when `opendal-s3` is enabled, OSS is routed through the -/// S3 code path and this function is dead code. -#[cfg(not(feature = "opendal-s3"))] -pub(crate) fn oss_config_parse( - mut m: std::collections::HashMap, -) -> Result { - use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; - - let mut cfg: OssConfig = OssConfig::default(); - if let Some(endpoint) = m.remove(OSS_ENDPOINT) { - cfg.endpoint = Some(endpoint); - }; - if let Some(access_key_id) = m.remove(OSS_ACCESS_KEY_ID) { - cfg.access_key_id = Some(access_key_id); - }; - if let Some(access_key_secret) = m.remove(OSS_ACCESS_KEY_SECRET) { - cfg.access_key_secret = Some(access_key_secret); - }; - - Ok(cfg) -} - -/// Build new opendal operator from give path. -pub(crate) fn oss_config_build(cfg: &OssConfig, path: &str) -> Result { - let url = Url::parse(path)?; - let bucket = url.host_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {path}, missing bucket"), - ) - })?; - - let builder = cfg.clone().into_builder().bucket(bucket); - - Ok(Operator::new(builder).map_err(from_opendal_error)?.finish()) -} diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 3da4983773..04487eece0 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -110,15 +110,6 @@ fn build_storage_for_scheme( customized_credential_load: customized_credential_load.clone(), }) } - // Fallback: builds without `opendal-s3` but with `opendal-oss` - // still use the native OSS service (which consumes `oss.*` keys). - #[cfg(all(feature = "opendal-oss", not(feature = "opendal-s3")))] - "oss" => { - let config = crate::oss::oss_config_parse(props.clone())?; - Ok(OpenDalStorage::Oss { - config: Arc::new(config), - }) - } #[cfg(feature = "opendal-azdls")] "azdls" => { let config = crate::azdls::azdls_config_parse(props.clone())?; @@ -420,15 +411,4 @@ mod tests { let cached = resolving.resolve("oss://my-bucket/other").unwrap(); assert!(Arc::ptr_eq(&storage, &cached)); } - - #[cfg(all(feature = "opendal-oss", not(feature = "opendal-s3")))] - #[test] - fn oss_scheme_falls_back_to_native_oss_backend() { - let props = HashMap::new(); - let storage = build_storage_for_scheme("oss", &props).unwrap(); - assert!( - matches!(storage, OpenDalStorage::Oss { .. }), - "Without opendal-s3, OSS should use the native OSS backend" - ); - } }