Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 8 additions & 77 deletions crates/storage/opendal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,8 @@ cfg_if! {
}
}

cfg_if! {
if #[cfg(feature = "opendal-oss")] {
mod oss;
use opendal::services::OssConfig;
use oss::*;
}
}
// When `opendal-oss` is enabled, `oss://` URLs are routed through the S3
// backend (OSS is S3-API-compatible). No native OSS module is needed.

cfg_if! {
if #[cfg(feature = "opendal-s3")] {
Expand Down Expand Up @@ -144,9 +139,12 @@ impl StorageFactory for OpenDalStorageFactory {
OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
config: gcs_config_parse(config.props().clone())?.into(),
})),
#[cfg(feature = "opendal-oss")]
OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
config: oss_config_parse(config.props().clone())?.into(),
// OSS is S3-API-compatible; route through S3 so `s3.*` props
// work for OSS-backed tables (mirrors pyiceberg/Java S3FileIO).
#[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))]
OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::S3 {
config: s3_config_parse(config.props().clone())?.into(),
customized_credential_load: None,
})),
#[cfg(feature = "opendal-azdls")]
OpenDalStorageFactory::Azdls => Ok(Arc::new(OpenDalStorage::Azdls {
Expand Down Expand Up @@ -201,12 +199,6 @@ pub enum OpenDalStorage {
/// GCS configuration.
config: Arc<GcsConfig>,
},
/// OSS storage variant.
#[cfg(feature = "opendal-oss")]
Oss {
/// OSS configuration.
config: Arc<OssConfig>,
},
/// Azure Data Lake Storage variant.
///
/// Accepts paths of the form
Expand Down Expand Up @@ -296,19 +288,6 @@ impl OpenDalStorage {
));
}
}
#[cfg(feature = "opendal-oss")]
OpenDalStorage::Oss { config } => {
let op = oss_config_build(config, path)?;
let prefix = format!("oss://{}/", op.info().name());
if path.starts_with(&prefix) {
(op, &path[prefix.len()..])
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Invalid oss url: {path}, should start with {prefix}"),
));
}
}
#[cfg(feature = "opendal-azdls")]
OpenDalStorage::Azdls { config } => azdls_create_operator(path, config)?,
#[cfg(all(
Expand Down Expand Up @@ -382,25 +361,6 @@ impl OpenDalStorage {
))
}
}
#[cfg(feature = "opendal-oss")]
OpenDalStorage::Oss { .. } => {
let url = url::Url::parse(path)?;
let bucket = url.host_str().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid oss url: {path}, missing bucket"),
)
})?;
let prefix = format!("oss://{}/", bucket);
if path.starts_with(&prefix) {
Ok(&path[prefix.len()..])
} else {
Err(Error::new(
ErrorKind::DataInvalid,
format!("Invalid oss url: {path}, should start with {prefix}"),
))
}
}
#[cfg(feature = "opendal-azdls")]
OpenDalStorage::Azdls { config } => {
let azure_path = path.parse::<AzureStoragePath>()?;
Expand Down Expand Up @@ -664,35 +624,6 @@ mod tests {
);
}

#[cfg(feature = "opendal-oss")]
#[test]
fn test_relativize_path_oss() {
let storage = OpenDalStorage::Oss {
config: Arc::new(OssConfig::default()),
};

assert_eq!(
storage
.relativize_path("oss://my-bucket/path/to/file.parquet")
.unwrap(),
"path/to/file.parquet"
);
}

#[cfg(feature = "opendal-oss")]
#[test]
fn test_relativize_path_oss_invalid_scheme() {
let storage = OpenDalStorage::Oss {
config: Arc::new(OssConfig::default()),
};

assert!(
storage
.relativize_path("s3://my-bucket/path/to/file.parquet")
.is_err()
);
}

#[cfg(feature = "opendal-azdls")]
#[test]
fn test_relativize_path_azdls() {
Expand Down
57 changes: 0 additions & 57 deletions crates/storage/opendal/src/oss.rs

This file was deleted.

55 changes: 52 additions & 3 deletions crates/storage/opendal/src/resolving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,14 @@ fn build_storage_for_scheme(
config: Arc::new(config),
})
}
#[cfg(feature = "opendal-oss")]
// OSS is S3-API-compatible; route through S3 so `s3.*` props
// work for OSS-backed tables (mirrors pyiceberg/Java S3FileIO).
#[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))]
"oss" => {
let config = crate::oss::oss_config_parse(props.clone())?;
Ok(OpenDalStorage::Oss {
let config = crate::s3::s3_config_parse(props.clone())?;
Ok(OpenDalStorage::S3 {
config: Arc::new(config),
customized_credential_load: customized_credential_load.clone(),
})
}
#[cfg(feature = "opendal-azdls")]
Expand Down Expand Up @@ -362,4 +365,50 @@ mod tests {
"abfss and abfs should share one instance"
);
}

#[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))]
#[test]
fn oss_scheme_routes_through_s3_backend_when_available() {
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT};

let mut props = HashMap::new();
props.insert(S3_ACCESS_KEY_ID.to_string(), "AKIA_TEST".to_string());
props.insert(
S3_ENDPOINT.to_string(),
"https://oss-cn-shanghai.aliyuncs.com".to_string(),
);

let storage = build_storage_for_scheme("oss", &props, &None).unwrap();
assert!(
matches!(storage, OpenDalStorage::S3 { .. }),
"OSS should route through S3 backend when opendal-s3 is enabled"
);
}

#[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))]
#[test]
fn oss_resolve_uses_s3_backend_and_caches() {
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT};

let mut props = HashMap::new();
props.insert(S3_ACCESS_KEY_ID.to_string(), "AKIA_TEST".to_string());
props.insert(
S3_ENDPOINT.to_string(),
"https://oss-cn-shanghai.aliyuncs.com".to_string(),
);

let resolving = OpenDalResolvingStorage {
props,
storages: RwLock::new(HashMap::new()),
#[cfg(feature = "opendal-s3")]
customized_credential_load: None,
};

let storage = resolving.resolve("oss://my-bucket/path/to/file").unwrap();
assert!(matches!(storage.as_ref(), OpenDalStorage::S3 { .. }));

// Second call should hit the cache.
let cached = resolving.resolve("oss://my-bucket/other").unwrap();
assert!(Arc::ptr_eq(&storage, &cached));
}
}
Loading