diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 65deaa5f44..de69a2bd25 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -73,13 +73,8 @@ cfg_if! { } } -cfg_if! { - if #[cfg(feature = "opendal-oss")] { - mod oss; - use opendal::services::OssConfig; - use oss::*; - } -} +// When `opendal-oss` is enabled, `oss://` URLs are routed through the S3 +// backend (OSS is S3-API-compatible). No native OSS module is needed. cfg_if! { if #[cfg(feature = "opendal-s3")] { @@ -144,9 +139,12 @@ impl StorageFactory for OpenDalStorageFactory { OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs { config: gcs_config_parse(config.props().clone())?.into(), })), - #[cfg(feature = "opendal-oss")] - OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss { - config: oss_config_parse(config.props().clone())?.into(), + // OSS is S3-API-compatible; route through S3 so `s3.*` props + // work for OSS-backed tables (mirrors pyiceberg/Java S3FileIO). + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::S3 { + config: s3_config_parse(config.props().clone())?.into(), + customized_credential_load: None, })), #[cfg(feature = "opendal-azdls")] OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls { @@ -201,12 +199,6 @@ pub enum OpenDalStorage { /// GCS configuration. config: Arc, }, - /// OSS storage variant. - #[cfg(feature = "opendal-oss")] - Oss { - /// OSS configuration. - config: Arc, - }, /// Azure Data Lake Storage variant. /// /// Accepts paths of the form @@ -296,19 +288,6 @@ impl OpenDalStorage { )); } } - #[cfg(feature = "opendal-oss")] - OpenDalStorage::Oss { config } => { - let op = oss_config_build(config, path)?; - let prefix = format!("oss://{}/", op.info().name()); - if path.starts_with(&prefix) { - (op, &path[prefix.len()..]) - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {path}, should start with {prefix}"), - )); - } - } #[cfg(feature = "opendal-azdls")] OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?, #[cfg(all( @@ -382,25 +361,6 @@ impl OpenDalStorage { )) } } - #[cfg(feature = "opendal-oss")] - OpenDalStorage::Oss { .. } => { - let url = url::Url::parse(path)?; - let bucket = url.host_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {path}, missing bucket"), - ) - })?; - let prefix = format!("oss://{}/", bucket); - if path.starts_with(&prefix) { - Ok(&path[prefix.len()..]) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {path}, should start with {prefix}"), - )) - } - } #[cfg(feature = "opendal-azdls")] OpenDalStorage::Azdls { config } => { let azure_path = path.parse::()?; @@ -664,35 +624,6 @@ mod tests { ); } - #[cfg(feature = "opendal-oss")] - #[test] - fn test_relativize_path_oss() { - let storage = OpenDalStorage::Oss { - config: Arc::new(OssConfig::default()), - }; - - assert_eq!( - storage - .relativize_path("oss://my-bucket/path/to/file.parquet") - .unwrap(), - "path/to/file.parquet" - ); - } - - #[cfg(feature = "opendal-oss")] - #[test] - fn test_relativize_path_oss_invalid_scheme() { - let storage = OpenDalStorage::Oss { - config: Arc::new(OssConfig::default()), - }; - - assert!( - storage - .relativize_path("s3://my-bucket/path/to/file.parquet") - .is_err() - ); - } - #[cfg(feature = "opendal-azdls")] #[test] fn test_relativize_path_azdls() { diff --git a/crates/storage/opendal/src/oss.rs b/crates/storage/opendal/src/oss.rs deleted file mode 100644 index add8b7a0f7..0000000000 --- a/crates/storage/opendal/src/oss.rs +++ /dev/null @@ -1,57 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; - -use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; -use iceberg::{Error, ErrorKind, Result}; -use opendal::services::OssConfig; -use opendal::{Configurator, Operator}; -use url::Url; - -use crate::utils::from_opendal_error; - -/// Parse iceberg props to oss config. -pub(crate) fn oss_config_parse(mut m: HashMap) -> Result { - let mut cfg: OssConfig = OssConfig::default(); - if let Some(endpoint) = m.remove(OSS_ENDPOINT) { - cfg.endpoint = Some(endpoint); - }; - if let Some(access_key_id) = m.remove(OSS_ACCESS_KEY_ID) { - cfg.access_key_id = Some(access_key_id); - }; - if let Some(access_key_secret) = m.remove(OSS_ACCESS_KEY_SECRET) { - cfg.access_key_secret = Some(access_key_secret); - }; - - Ok(cfg) -} - -/// Build new opendal operator from give path. -pub(crate) fn oss_config_build(cfg: &OssConfig, path: &str) -> Result { - let url = Url::parse(path)?; - let bucket = url.host_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {path}, missing bucket"), - ) - })?; - - let builder = cfg.clone().into_builder().bucket(bucket); - - Ok(Operator::new(builder).map_err(from_opendal_error)?.finish()) -} diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 621495519a..04487eece0 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -100,11 +100,14 @@ fn build_storage_for_scheme( config: Arc::new(config), }) } - #[cfg(feature = "opendal-oss")] + // OSS is S3-API-compatible; route through S3 so `s3.*` props + // work for OSS-backed tables (mirrors pyiceberg/Java S3FileIO). + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] "oss" => { - let config = crate::oss::oss_config_parse(props.clone())?; - Ok(OpenDalStorage::Oss { + let config = crate::s3::s3_config_parse(props.clone())?; + Ok(OpenDalStorage::S3 { config: Arc::new(config), + customized_credential_load: customized_credential_load.clone(), }) } #[cfg(feature = "opendal-azdls")] @@ -362,4 +365,50 @@ mod tests { "abfss and abfs should share one instance" ); } + + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + #[test] + fn oss_scheme_routes_through_s3_backend_when_available() { + use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT}; + + let mut props = HashMap::new(); + props.insert(S3_ACCESS_KEY_ID.to_string(), "AKIA_TEST".to_string()); + props.insert( + S3_ENDPOINT.to_string(), + "https://oss-cn-shanghai.aliyuncs.com".to_string(), + ); + + let storage = build_storage_for_scheme("oss", &props, &None).unwrap(); + assert!( + matches!(storage, OpenDalStorage::S3 { .. }), + "OSS should route through S3 backend when opendal-s3 is enabled" + ); + } + + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + #[test] + fn oss_resolve_uses_s3_backend_and_caches() { + use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT}; + + let mut props = HashMap::new(); + props.insert(S3_ACCESS_KEY_ID.to_string(), "AKIA_TEST".to_string()); + props.insert( + S3_ENDPOINT.to_string(), + "https://oss-cn-shanghai.aliyuncs.com".to_string(), + ); + + let resolving = OpenDalResolvingStorage { + props, + storages: RwLock::new(HashMap::new()), + #[cfg(feature = "opendal-s3")] + customized_credential_load: None, + }; + + let storage = resolving.resolve("oss://my-bucket/path/to/file").unwrap(); + assert!(matches!(storage.as_ref(), OpenDalStorage::S3 { .. })); + + // Second call should hit the cache. + let cached = resolving.resolve("oss://my-bucket/other").unwrap(); + assert!(Arc::ptr_eq(&storage, &cached)); + } }