diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 9cb8f433..8735038a 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -87,6 +87,19 @@ async def main(): except Exception as e: print(f"Failed to get table info: {e}") + # Demo: List offsets + print("\n--- Testing list_offsets() ---") + try: + # Query latest offsets using OffsetType constant (recommended for type safety) + offsets = await admin.list_offsets( + table_path, + bucket_ids=[0], + offset_type=fluss.OffsetType.LATEST + ) + print(f"Latest offsets for table (before writes): {offsets}") + except Exception as e: + print(f"Failed to list offsets: {e}") + # Get the table instance table = await conn.get_table(table_path) print(f"Got table: {table}") @@ -96,7 +109,7 @@ async def main(): print(f"Created append writer: {append_writer}") try: - # Test 1: Write PyArrow Table + # Demo: Write PyArrow Table print("\n--- Testing PyArrow Table write ---") pa_table = pa.Table.from_arrays( [ @@ -139,7 +152,7 @@ async def main(): append_writer.write_arrow(pa_table) print("Successfully wrote PyArrow Table") - # Test 2: Write PyArrow RecordBatch + # Demo: Write PyArrow RecordBatch print("\n--- Testing PyArrow RecordBatch write ---") pa_record_batch = pa.RecordBatch.from_arrays( [ @@ -202,7 +215,7 @@ async def main(): ) print("Successfully appended row (list with Date, Time, Timestamp, Decimal)") - # Test 4: Write Pandas DataFrame + # Demo: Write Pandas DataFrame print("\n--- Testing Pandas DataFrame write ---") df = pd.DataFrame( { @@ -232,6 +245,19 @@ async def main(): append_writer.flush() print("Successfully flushed data") + # Demo: Check offsets after writes + print("\n--- Checking offsets after writes ---") + try: + # Query with string constant (alternative API - both strings and constants are supported) + offsets = await admin.list_offsets( + table_path, + bucket_ids=[0], + offset_type="latest" # Can also use "earliest" or "timestamp" + ) + print(f"Latest offsets after writing 7 records: {offsets}") + except Exception as e: + print(f"Failed to list offsets: {e}") + except Exception as e: print(f"Error during writing: {e}") @@ -242,10 +268,13 @@ async def main(): batch_scanner = await table.new_scan().create_batch_scanner() print(f"Created batch scanner: {batch_scanner}") - # Subscribe to scan from earliest to latest - # start_timestamp=None (earliest), end_timestamp=None (latest) - batch_scanner.subscribe(None, None) + # Subscribe to buckets (required before to_arrow/to_pandas) + # Use subscribe_buckets to subscribe all buckets from EARLIEST_OFFSET + num_buckets = (await admin.get_table(table_path)).num_buckets + batch_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(f"Subscribed to {num_buckets} buckets from EARLIEST_OFFSET") + # Read all data using to_arrow() print("Scanning results using to_arrow():") # Try to get as PyArrow Table @@ -255,13 +284,13 @@ async def main(): except Exception as e: print(f"Could not convert to PyArrow: {e}") - # Let's subscribe from the beginning again. - # Reset subscription - batch_scanner.subscribe(None, None) + # Create a new batch scanner for to_pandas() test + batch_scanner2 = await table.new_scan().create_batch_scanner() + batch_scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) # Try to get as Pandas DataFrame try: - df_result = batch_scanner.to_pandas() + df_result = batch_scanner2.to_pandas() print(f"\nAs Pandas DataFrame:\n{df_result}") except Exception as e: print(f"Could not convert to Pandas: {e}") @@ -273,13 +302,14 @@ async def main(): # Test poll_arrow() method for incremental reading as Arrow Table print("\n--- Testing poll_arrow() method ---") - # Reset subscription to start from the beginning - batch_scanner.subscribe(None, None) + batch_scanner3 = await table.new_scan().create_batch_scanner() + batch_scanner3.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + print(f"Subscribed to bucket 0 at EARLIEST_OFFSET ({fluss.EARLIEST_OFFSET})") # Poll with a timeout of 5000ms (5 seconds) # Note: poll_arrow() returns an empty table (not an error) on timeout try: - poll_result = batch_scanner.poll_arrow(5000) + poll_result = batch_scanner3.poll_arrow(5000) print(f"Number of rows: {poll_result.num_rows}") if poll_result.num_rows > 0: @@ -287,7 +317,7 @@ async def main(): print(f"Polled data:\n{poll_df}") else: print("Empty result (no records available)") - # Empty table still has schema + # Empty table still has schema - this is useful! print(f"Schema: {poll_result.schema}") except Exception as e: @@ -295,10 +325,11 @@ async def main(): # Test poll_batches() method for batches with metadata print("\n--- Testing poll_batches() method ---") - batch_scanner.subscribe(None, None) + batch_scanner4 = await table.new_scan().create_batch_scanner() + batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) try: - batches = batch_scanner.poll_batches(5000) + batches = batch_scanner4.poll_batches(5000) print(f"Number of batches: {len(batches)}") for i, batch in enumerate(batches): @@ -319,7 +350,7 @@ async def main(): record_scanner = await table.new_scan().create_log_scanner() print(f"Created record scanner: {record_scanner}") - record_scanner.subscribe(None, None) + record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) # Poll returns List[ScanRecord] with per-record metadata print("\n--- Testing poll() method (record-by-record) ---") @@ -539,10 +570,13 @@ async def main(): # Demo: Column projection using builder pattern print("\n--- Testing Column Projection ---") try: + # Get bucket count for subscriptions + num_buckets = (await admin.get_table(table_path)).num_buckets + # Project specific columns by index (using batch scanner for to_pandas) print("\n1. Projection by index [0, 1] (id, name):") scanner_index = await table.new_scan().project([0, 1]).create_batch_scanner() - scanner_index.subscribe(None, None) + scanner_index.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) df_projected = scanner_index.to_pandas() print(df_projected.head()) print( @@ -554,7 +588,7 @@ async def main(): scanner_names = await table.new_scan() \ .project_by_name(["name", "score"]) \ .create_batch_scanner() - scanner_names.subscribe(None, None) + scanner_names.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) df_named = scanner_names.to_pandas() print(df_named.head()) print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}") @@ -562,7 +596,7 @@ async def main(): # Test empty result schema with projection print("\n3. Testing empty result schema with projection:") scanner_proj = await table.new_scan().project([0, 2]).create_batch_scanner() - scanner_proj.subscribe(None, None) + scanner_proj.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) # Quick poll that may return empty result = scanner_proj.poll_arrow(100) print(f" Schema columns: {result.schema.names}") @@ -570,6 +604,134 @@ async def main(): except Exception as e: print(f"Error during projection: {e}") + # Demo: Drop tables + print("\n--- Testing drop_table() ---") + try: + # Drop the log table + await admin.drop_table(table_path, ignore_if_not_exists=True) + print(f"Successfully dropped table: {table_path}") + # Drop the PK table + await admin.drop_table(pk_table_path, ignore_if_not_exists=True) + print(f"Successfully dropped table: {pk_table_path}") + except Exception as e: + print(f"Failed to drop table: {e}") + + # ===================================================== + # Demo: Partitioned Table with list_partition_offsets + # ===================================================== + print("\n" + "=" * 60) + print("--- Testing Partitioned Table ---") + print("=" * 60) + + # Create a partitioned log table + partitioned_fields = [ + pa.field("id", pa.int32()), + pa.field("region", pa.string()), # partition key + pa.field("value", pa.int64()), + ] + partitioned_schema = pa.schema(partitioned_fields) + fluss_partitioned_schema = fluss.Schema(partitioned_schema) + + partitioned_table_descriptor = fluss.TableDescriptor( + fluss_partitioned_schema, + partition_keys=["region"], # Partition by region + bucket_count=1, + ) + + partitioned_table_path = fluss.TablePath("fluss", "partitioned_log_table_py") + + try: + # Drop if exists first + await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True) + print(f"Dropped existing table: {partitioned_table_path}") + + # Create the partitioned table + await admin.create_table(partitioned_table_path, partitioned_table_descriptor, False) + print(f"Created partitioned table: {partitioned_table_path}") + + # Create partitions for US and EU regions + print("\n--- Creating partitions ---") + await admin.create_partition(partitioned_table_path, {"region": "US"}, ignore_if_exists=True) + print("Created partition: region=US") + await admin.create_partition(partitioned_table_path, {"region": "EU"}, ignore_if_exists=True) + print("Created partition: region=EU") + + # List partitions + print("\n--- Listing partitions ---") + partition_infos = await admin.list_partition_infos(partitioned_table_path) + for p in partition_infos: + print(f" {p}") # PartitionInfo(partition_id=..., partition_name='region=...') + + # Get the table and write some data + partitioned_table = await conn.get_table(partitioned_table_path) + partitioned_writer = await partitioned_table.new_append_writer() + + # Append data to US partition + await partitioned_writer.append({"id": 1, "region": "US", "value": 100}) + await partitioned_writer.append({"id": 2, "region": "US", "value": 200}) + # Append data to EU partition + await partitioned_writer.append({"id": 3, "region": "EU", "value": 300}) + await partitioned_writer.append({"id": 4, "region": "EU", "value": 400}) + partitioned_writer.flush() + print("\nWrote 4 records (2 to US, 2 to EU)") + + # Demo: list_partition_offsets + print("\n--- Testing list_partition_offsets ---") + + # Query offsets for US partition + # Note: partition_name is just the value (e.g., "US"), not "region=US" + us_offsets = await admin.list_partition_offsets( + partitioned_table_path, + partition_name="US", + bucket_ids=[0], + offset_type="latest" + ) + print(f"US partition latest offsets: {us_offsets}") + + # Query offsets for EU partition + eu_offsets = await admin.list_partition_offsets( + partitioned_table_path, + partition_name="EU", + bucket_ids=[0], + offset_type="latest" + ) + print(f"EU partition latest offsets: {eu_offsets}") + + # Demo: subscribe_partition for reading partitioned data + print("\n--- Testing subscribe_partition + to_arrow() ---") + partitioned_scanner = await partitioned_table.new_scan().create_batch_scanner() + + # Subscribe to each partition using partition_id + for p in partition_infos: + partitioned_scanner.subscribe_partition( + partition_id=p.partition_id, + bucket_id=0, + start_offset=fluss.EARLIEST_OFFSET + ) + print(f"Subscribed to partition {p.partition_name} (id={p.partition_id})") + + # Use to_arrow() - now works for partitioned tables! + partitioned_arrow = partitioned_scanner.to_arrow() + print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:") + print(partitioned_arrow.to_pandas()) + + # Demo: to_pandas() also works for partitioned tables + print("\n--- Testing to_pandas() on partitioned table ---") + partitioned_scanner2 = await partitioned_table.new_scan().create_batch_scanner() + for p in partition_infos: + partitioned_scanner2.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) + partitioned_df = partitioned_scanner2.to_pandas() + print(f"to_pandas() returned {len(partitioned_df)} records:") + print(partitioned_df) + + # Cleanup + await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True) + print(f"\nDropped partitioned table: {partitioned_table_path}") + + except Exception as e: + print(f"Error with partitioned table: {e}") + traceback.print_exc() + # Close connection conn.close() print("\nConnection closed") diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 40d18f6c..a2bbaac4 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -129,6 +129,78 @@ class FlussAdmin: ) -> None: ... async def get_table(self, table_path: TablePath) -> TableInfo: ... async def get_latest_lake_snapshot(self, table_path: TablePath) -> LakeSnapshot: ... + async def drop_table( + self, + table_path: TablePath, + ignore_if_not_exists: bool = False, + ) -> None: ... + async def list_offsets( + self, + table_path: TablePath, + bucket_ids: List[int], + offset_type: str, + timestamp: Optional[int] = None, + ) -> Dict[int, int]: + """List offsets for the specified buckets. + + Args: + table_path: Path to the table + bucket_ids: List of bucket IDs to query + offset_type: "earliest", "latest", or "timestamp" + timestamp: Required when offset_type is "timestamp" + + Returns: + Dict mapping bucket_id -> offset + """ + ... + async def list_partition_offsets( + self, + table_path: TablePath, + partition_name: str, + bucket_ids: List[int], + offset_type: str, + timestamp: Optional[int] = None, + ) -> Dict[int, int]: + """List offsets for buckets in a specific partition. + + Args: + table_path: Path to the table + partition_name: Partition value (e.g., "US" not "region=US") + bucket_ids: List of bucket IDs to query + offset_type: "earliest", "latest", or "timestamp" + timestamp: Required when offset_type is "timestamp" + + Returns: + Dict mapping bucket_id -> offset + """ + ... + async def create_partition( + self, + table_path: TablePath, + partition_spec: Dict[str, str], + ignore_if_exists: bool = False, + ) -> None: + """Create a partition for a partitioned table. + + Args: + table_path: Path to the table + partition_spec: Dict mapping partition column name to value (e.g., {"region": "US"}) + ignore_if_exists: If True, don't raise error if partition already exists + """ + ... + async def list_partition_infos( + self, + table_path: TablePath, + ) -> List["PartitionInfo"]: + """List all partitions for a partitioned table. + + Args: + table_path: Path to the table + + Returns: + List of PartitionInfo objects + """ + ... def __repr__(self) -> str: ... class TableScan: @@ -322,14 +394,30 @@ class LogScanner: scanner = await table.new_scan().project([0, 1]).create_log_scanner() """ - def subscribe( - self, start_timestamp: Optional[int], end_timestamp: Optional[int] + def subscribe(self, bucket_id: int, start_offset: int) -> None: + """Subscribe to a single bucket at a specific offset (non-partitioned tables). + + Args: + bucket_id: The bucket ID to subscribe to + start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) + """ + ... + def subscribe_buckets(self, bucket_offsets: Dict[int, int]) -> None: + """Subscribe to multiple buckets at specified offsets (non-partitioned tables). + + Args: + bucket_offsets: Dict mapping bucket_id -> start_offset + """ + ... + def subscribe_partition( + self, partition_id: int, bucket_id: int, start_offset: int ) -> None: - """Subscribe to log data with timestamp range. + """Subscribe to a bucket within a specific partition (partitioned tables only). Args: - start_timestamp: Not yet supported, must be None. - end_timestamp: Not yet supported, must be None. + partition_id: The partition ID (from PartitionInfo.partition_id) + bucket_id: The bucket ID within the partition + start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) """ ... def poll(self, timeout_ms: int) -> List[ScanRecord]: @@ -384,12 +472,18 @@ class LogScanner: """Convert all data to Pandas DataFrame. Requires a batch-based scanner (created with new_scan().create_batch_scanner()). + Reads from currently subscribed buckets until reaching their latest offsets. + + You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. """ ... def to_arrow(self) -> pa.Table: """Convert all data to Arrow Table. Requires a batch-based scanner (created with new_scan().create_batch_scanner()). + Reads from currently subscribed buckets until reaching their latest offsets. + + You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. """ ... def __repr__(self) -> str: ... @@ -493,4 +587,27 @@ class TableDistribution: def bucket_keys(self) -> List[str]: ... def bucket_count(self) -> Optional[int]: ... +class PartitionInfo: + """Information about a partition.""" + + @property + def partition_id(self) -> int: + """Get the partition ID (globally unique in the cluster).""" + ... + @property + def partition_name(self) -> str: + """Get the partition name.""" + ... + def __repr__(self) -> str: ... + +class OffsetType: + """Offset type constants for list_offsets().""" + + EARLIEST: str + LATEST: str + TIMESTAMP: str + +# Constant for earliest offset (-2) +EARLIEST_OFFSET: int + __version__: str diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs index fa189eb8..d28c9c06 100644 --- a/bindings/python/src/admin.rs +++ b/bindings/python/src/admin.rs @@ -16,6 +16,7 @@ // under the License. use crate::*; +use fcore::rpc::message::OffsetSpec; use pyo3_async_runtimes::tokio::future_into_py; use std::sync::Arc; @@ -25,6 +26,37 @@ pub struct FlussAdmin { __admin: Arc, } +/// Parse offset_type string into OffsetSpec +fn parse_offset_spec(offset_type: &str, timestamp: Option) -> PyResult { + match offset_type { + s if s.eq_ignore_ascii_case("earliest") => Ok(OffsetSpec::Earliest), + s if s.eq_ignore_ascii_case("latest") => Ok(OffsetSpec::Latest), + s if s.eq_ignore_ascii_case("timestamp") => { + let ts = timestamp.ok_or_else(|| { + FlussError::new_err("timestamp must be provided when offset_type='timestamp'") + })?; + Ok(OffsetSpec::Timestamp(ts)) + } + _ => Err(FlussError::new_err(format!( + "Invalid offset_type: '{}'. Must be 'earliest', 'latest', or 'timestamp'", + offset_type + ))), + } +} + +/// Validate bucket IDs are non-negative +fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> { + for &bucket_id in bucket_ids { + if bucket_id < 0 { + return Err(FlussError::new_err(format!( + "Invalid bucket_id: {}. Bucket IDs must be non-negative", + bucket_id + ))); + } + } + Ok(()) +} + #[pymethods] impl FlussAdmin { /// Create a table with the given schema @@ -38,7 +70,7 @@ impl FlussAdmin { ) -> PyResult> { let ignore = ignore_if_exists.unwrap_or(false); - let core_table_path = table_path.to_core().clone(); + let core_table_path = table_path.to_core(); let core_descriptor = table_descriptor.to_core().clone(); let admin = self.__admin.clone(); @@ -58,7 +90,7 @@ impl FlussAdmin { py: Python<'py>, table_path: &TablePath, ) -> PyResult> { - let core_table_path = table_path.to_core().clone(); + let core_table_path = table_path.to_core(); let admin = self.__admin.clone(); future_into_py(py, async move { @@ -80,7 +112,7 @@ impl FlussAdmin { py: Python<'py>, table_path: &TablePath, ) -> PyResult> { - let core_table_path = table_path.to_core().clone(); + let core_table_path = table_path.to_core(); let admin = self.__admin.clone(); future_into_py(py, async move { @@ -96,6 +128,183 @@ impl FlussAdmin { }) } + /// Drop a table + #[pyo3(signature = (table_path, ignore_if_not_exists=false))] + pub fn drop_table<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + ignore_if_not_exists: bool, + ) -> PyResult> { + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + + future_into_py(py, async move { + admin + .drop_table(&core_table_path, ignore_if_not_exists) + .await + .map_err(|e| FlussError::new_err(format!("Failed to drop table: {e}")))?; + + Python::attach(|py| Ok(py.None())) + }) + } + + /// List offsets for buckets (non-partitioned tables only). + /// + /// Args: + /// table_path: Path to the table + /// bucket_ids: List of bucket IDs to query + /// offset_type: Type of offset to retrieve: + /// - "earliest" or OffsetType.EARLIEST: Start of the log + /// - "latest" or OffsetType.LATEST: End of the log + /// - "timestamp" or OffsetType.TIMESTAMP: Offset at given timestamp (requires timestamp arg) + /// timestamp: Required when offset_type is "timestamp", ignored otherwise + /// + /// Returns: + /// dict[int, int]: Mapping of bucket_id -> offset + #[pyo3(signature = (table_path, bucket_ids, offset_type, timestamp=None))] + pub fn list_offsets<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + bucket_ids: Vec, + offset_type: &str, + timestamp: Option, + ) -> PyResult> { + validate_bucket_ids(&bucket_ids)?; + let offset_spec = parse_offset_spec(offset_type, timestamp)?; + + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + + future_into_py(py, async move { + let offsets = admin + .list_offsets(&core_table_path, &bucket_ids, offset_spec) + .await + .map_err(|e| FlussError::new_err(format!("Failed to list offsets: {e}")))?; + + Python::attach(|py| { + let dict = pyo3::types::PyDict::new(py); + for (bucket_id, offset) in offsets { + dict.set_item(bucket_id, offset)?; + } + Ok(dict.unbind()) + }) + }) + } + + /// List offsets for buckets in a specific partition of a partitioned table. + /// + /// Args: + /// table_path: Path to the table + /// partition_name: Partition value (e.g., "US" not "region=US") + /// bucket_ids: List of bucket IDs to query + /// offset_type: Type of offset to retrieve: + /// - "earliest" or OffsetType.EARLIEST: Start of the log + /// - "latest" or OffsetType.LATEST: End of the log + /// - "timestamp" or OffsetType.TIMESTAMP: Offset at given timestamp (requires timestamp arg) + /// timestamp: Required when offset_type is "timestamp", ignored otherwise + /// + /// Returns: + /// dict[int, int]: Mapping of bucket_id -> offset + #[pyo3(signature = (table_path, partition_name, bucket_ids, offset_type, timestamp=None))] + pub fn list_partition_offsets<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + partition_name: &str, + bucket_ids: Vec, + offset_type: &str, + timestamp: Option, + ) -> PyResult> { + validate_bucket_ids(&bucket_ids)?; + let offset_spec = parse_offset_spec(offset_type, timestamp)?; + + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + let partition_name = partition_name.to_string(); + + future_into_py(py, async move { + let offsets = admin + .list_partition_offsets(&core_table_path, &partition_name, &bucket_ids, offset_spec) + .await + .map_err(|e| { + FlussError::new_err(format!("Failed to list partition offsets: {e}")) + })?; + + Python::attach(|py| { + let dict = pyo3::types::PyDict::new(py); + for (bucket_id, offset) in offsets { + dict.set_item(bucket_id, offset)?; + } + Ok(dict.unbind()) + }) + }) + } + + /// Create a partition for a partitioned table. + /// + /// Args: + /// table_path: Path to the table + /// partition_spec: Dict mapping partition column name to value (e.g., {"region": "US"}) + /// ignore_if_exists: If True, don't raise error if partition already exists + /// + /// Returns: + /// None + #[pyo3(signature = (table_path, partition_spec, ignore_if_exists=false))] + pub fn create_partition<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + partition_spec: std::collections::HashMap, + ignore_if_exists: bool, + ) -> PyResult> { + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + let core_partition_spec = fcore::metadata::PartitionSpec::new(partition_spec); + + future_into_py(py, async move { + admin + .create_partition(&core_table_path, &core_partition_spec, ignore_if_exists) + .await + .map_err(|e| FlussError::new_err(format!("Failed to create partition: {e}")))?; + + Python::attach(|py| Ok(py.None())) + }) + } + + /// List all partitions for a partitioned table. + /// + /// Args: + /// table_path: Path to the table + /// + /// Returns: + /// List[PartitionInfo]: List of partition info objects + pub fn list_partition_infos<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + ) -> PyResult> { + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + + future_into_py(py, async move { + let partition_infos = admin + .list_partition_infos(&core_table_path) + .await + .map_err(|e| FlussError::new_err(format!("Failed to list partitions: {e}")))?; + + Python::attach(|py| { + let py_list = pyo3::types::PyList::empty(py); + for info in partition_infos { + let py_info = PartitionInfo::from_core(info); + py_list.append(Py::new(py, py_info)?)?; + } + Ok(py_list.unbind()) + }) + }) + } + fn __repr__(&self) -> String { "FlussAdmin()".to_string() } @@ -109,3 +318,41 @@ impl FlussAdmin { } } } + +/// Information about a partition +#[pyclass] +pub struct PartitionInfo { + partition_id: i64, + partition_name: String, +} + +#[pymethods] +impl PartitionInfo { + /// Get the partition ID (globally unique in the cluster) + #[getter] + fn partition_id(&self) -> i64 { + self.partition_id + } + + /// Get the partition name (e.g., "US" for a table partitioned by region) + #[getter] + fn partition_name(&self) -> &str { + &self.partition_name + } + + fn __repr__(&self) -> String { + format!( + "PartitionInfo(partition_id={}, partition_name='{}')", + self.partition_id, self.partition_name + ) + } +} + +impl PartitionInfo { + pub fn from_core(info: fcore::metadata::PartitionInfo) -> Self { + Self { + partition_id: info.get_partition_id(), + partition_name: info.get_partition_name(), + } + } +} diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index ce063aba..ae7f6c50 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -48,6 +48,23 @@ static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| { .expect("Failed to create Tokio runtime") }); +/// Offset type constants for list_offsets() +#[pyclass] +#[derive(Clone)] +pub struct OffsetType; + +#[pymethods] +impl OffsetType { + #[classattr] + const EARLIEST: &'static str = "earliest"; + + #[classattr] + const LATEST: &'static str = "latest"; + + #[classattr] + const TIMESTAMP: &'static str = "timestamp"; +} + #[pymodule] fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { // Register all classes @@ -69,6 +86,11 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + + // Register constants + m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?; // Register exception types m.add_class::()?; diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 30c7ce0b..c285f25c 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -20,7 +20,6 @@ use crate::*; use arrow::array::RecordBatch as ArrowRecordBatch; use arrow_pyarrow::{FromPyArrow, ToPyArrow}; use arrow_schema::SchemaRef; -use fluss::client::EARLIEST_OFFSET; use fluss::record::to_arrow_schema; use fluss::rpc::message::OffsetSpec; use pyo3::types::IntoPyDict; @@ -1573,155 +1572,95 @@ pub struct LogScanner { projected_schema: SchemaRef, /// The projected row type to use for record-based scanning projected_row_type: fcore::metadata::RowType, - #[allow(dead_code)] - start_timestamp: Option, - #[allow(dead_code)] - end_timestamp: Option, + /// Cache for partition_id -> partition_name mapping (avoids repeated list_partition_infos calls) + partition_name_cache: std::sync::RwLock>>, } #[pymethods] impl LogScanner { - /// Subscribe to log data with timestamp range - fn subscribe( - &mut self, - _start_timestamp: Option, - _end_timestamp: Option, - ) -> PyResult<()> { - if _start_timestamp.is_some() { - return Err(FlussError::new_err( - "Specifying start_timestamp is not yet supported. Please use None.".to_string(), - )); - } - if _end_timestamp.is_some() { - return Err(FlussError::new_err( - "Specifying end_timestamp is not yet supported. Please use None.".to_string(), - )); - } - - let num_buckets = self.table_info.get_num_buckets(); - for bucket_id in 0..num_buckets { - let start_offset = EARLIEST_OFFSET; - - // Subscribe to the appropriate scanner - if let Some(ref inner) = self.inner { - TOKIO_RUNTIME.block_on(async { + /// Subscribe to a single bucket at a specific offset (non-partitioned tables). + /// + /// Args: + /// bucket_id: The bucket ID to subscribe to + /// start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) + fn subscribe(&self, py: Python, bucket_id: i32, start_offset: i64) -> PyResult<()> { + py.detach(|| { + TOKIO_RUNTIME.block_on(async { + if let Some(ref inner) = self.inner { inner .subscribe(bucket_id, start_offset) .await - .map_err(|e| FlussError::new_err(e.to_string())) - })?; - } else if let Some(ref inner_batch) = self.inner_batch { - TOKIO_RUNTIME.block_on(async { + .map_err(|e| FlussError::new_err(format!("Failed to subscribe: {e}"))) + } else if let Some(ref inner_batch) = self.inner_batch { inner_batch .subscribe(bucket_id, start_offset) .await - .map_err(|e| FlussError::new_err(e.to_string())) - })?; - } else { - return Err(FlussError::new_err("No scanner available")); - } - } - - Ok(()) + .map_err(|e| FlussError::new_err(format!("Failed to subscribe: {e}"))) + } else { + Err(FlussError::new_err("No scanner available")) + } + }) + }) } - /// Convert all data to Arrow Table + /// Subscribe to multiple buckets at specified offsets (non-partitioned tables). /// - /// Note: Requires a batch-based scanner (created with new_scan().create_batch_scanner()). - fn to_arrow(&self, py: Python) -> PyResult> { - let inner_batch = self.inner_batch.as_ref().ok_or_else(|| { - FlussError::new_err( - "Batch-based scanner not available. Use new_scan().create_batch_scanner() to create a scanner \ - that supports to_arrow().", - ) - })?; - - let mut all_batches = Vec::new(); - - let num_buckets = self.table_info.get_num_buckets(); - let bucket_ids: Vec = (0..num_buckets).collect(); - - // todo: after supporting list_offsets with timestamp, we can use start_timestamp and end_timestamp here - let mut stopping_offsets: HashMap = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { - self.admin - .list_offsets( - &self.table_info.table_path, - bucket_ids.as_slice(), - OffsetSpec::Latest, - ) + /// Args: + /// bucket_offsets: A dict mapping bucket_id -> start_offset + fn subscribe_buckets(&self, py: Python, bucket_offsets: HashMap) -> PyResult<()> { + py.detach(|| { + TOKIO_RUNTIME.block_on(async { + if let Some(ref inner) = self.inner { + inner + .subscribe_buckets(&bucket_offsets) .await - }) - }) - .map_err(|e| FlussError::new_err(e.to_string()))?; - - // Filter out buckets with no records to read (stop_at <= 0) - stopping_offsets.retain(|_, &mut v| v > 0); - - while !stopping_offsets.is_empty() { - let scan_batches = py - .detach(|| { - TOKIO_RUNTIME - .block_on(async { inner_batch.poll(Duration::from_millis(500)).await }) - }) - .map_err(|e| FlussError::new_err(e.to_string()))?; - - if scan_batches.is_empty() { - continue; - } - - for scan_batch in scan_batches { - let bucket_id = scan_batch.bucket().bucket_id(); - - // Check if this bucket is still being tracked; if not, ignore the batch - let Some(&stop_at) = stopping_offsets.get(&bucket_id) else { - continue; - }; - - let base_offset = scan_batch.base_offset(); - let last_offset = scan_batch.last_offset(); - - // If the batch starts at or after the stop_at offset, the bucket is exhausted - if base_offset >= stop_at { - stopping_offsets.remove(&bucket_id); - continue; - } - - let batch = if last_offset >= stop_at { - // This batch contains the target offset; slice it to keep only records - // where offset < stop_at. - let num_to_keep = (stop_at - base_offset) as usize; - let b = scan_batch.into_batch(); - - // Safety check: ensure we don't attempt to slice more rows than the batch contains - let limit = num_to_keep.min(b.num_rows()); - b.slice(0, limit) + .map_err(|e| FlussError::new_err(format!("Failed to subscribe batch: {e}"))) + } else if let Some(ref inner_batch) = self.inner_batch { + inner_batch + .subscribe_buckets(&bucket_offsets) + .await + .map_err(|e| FlussError::new_err(format!("Failed to subscribe batch: {e}"))) } else { - // The entire batch is within the desired range (all offsets < stop_at) - scan_batch.into_batch() - }; - - all_batches.push(Arc::new(batch)); - - // If the batch's last offset reached or passed the inclusive limit (stop_at - 1), - // we are done with this bucket. - if last_offset >= stop_at - 1 { - stopping_offsets.remove(&bucket_id); + Err(FlussError::new_err("No scanner available")) } - } - } - - Utils::combine_batches_to_table(py, all_batches) + }) + }) } - /// Convert all data to Pandas DataFrame - fn to_pandas(&self, py: Python) -> PyResult> { - let arrow_table = self.to_arrow(py)?; - - // Convert Arrow Table to Pandas DataFrame using pyarrow - let df = arrow_table.call_method0(py, "to_pandas")?; - Ok(df) + /// Subscribe to a bucket within a specific partition (partitioned tables only). + /// + /// Args: + /// partition_id: The partition ID (from PartitionInfo.partition_id) + /// bucket_id: The bucket ID within the partition + /// start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) + fn subscribe_partition( + &self, + py: Python, + partition_id: i64, + bucket_id: i32, + start_offset: i64, + ) -> PyResult<()> { + py.detach(|| { + TOKIO_RUNTIME.block_on(async { + if let Some(ref inner) = self.inner { + inner + .subscribe_partition(partition_id, bucket_id, start_offset) + .await + .map_err(|e| { + FlussError::new_err(format!("Failed to subscribe partition: {e}")) + }) + } else if let Some(ref inner_batch) = self.inner_batch { + inner_batch + .subscribe_partition(partition_id, bucket_id, start_offset) + .await + .map_err(|e| { + FlussError::new_err(format!("Failed to subscribe partition: {e}")) + }) + } else { + Err(FlussError::new_err("No scanner available")) + } + }) + }) } /// Poll for individual records with metadata. @@ -1873,6 +1812,54 @@ impl LogScanner { Ok(empty_table.into()) } + /// Convert all data to Arrow Table. + /// + /// Reads from currently subscribed buckets until reaching their latest offsets. + /// Works for both partitioned and non-partitioned tables. + /// + /// You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. + /// + /// Returns: + /// PyArrow Table containing all data from subscribed buckets + fn to_arrow(&self, py: Python) -> PyResult> { + // 1. Get subscribed buckets from scanner (requires batch scanner for get_subscribed_buckets) + let inner_batch = self.inner_batch.as_ref().ok_or_else(|| { + FlussError::new_err( + "Batch-based scanner not available. Use new_scan().create_batch_scanner() to create a scanner \ + that supports to_arrow().", + ) + })?; + let subscribed = inner_batch.get_subscribed_buckets(); + if subscribed.is_empty() { + return Err(FlussError::new_err( + "No buckets subscribed. Call subscribe(), subscribe_buckets(), or subscribe_partition() first.", + )); + } + + // 2. Query latest offsets for all subscribed buckets + let stopping_offsets = self.query_latest_offsets(py, &subscribed)?; + + // 3. Poll until all buckets reach their stopping offsets + self.poll_until_offsets(py, stopping_offsets) + } + + /// Convert all data to Pandas DataFrame. + /// + /// Reads from currently subscribed buckets until reaching their latest offsets. + /// Works for both partitioned and non-partitioned tables. + /// + /// You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. + /// + /// Returns: + /// Pandas DataFrame containing all data from subscribed buckets + fn to_pandas(&self, py: Python) -> PyResult> { + let arrow_table = self.to_arrow(py)?; + + // Convert Arrow Table to Pandas DataFrame using pyarrow + let df = arrow_table.call_method0(py, "to_pandas")?; + Ok(df) + } + fn __repr__(&self) -> String { format!("LogScanner(table={})", self.table_info.table_path) } @@ -1894,8 +1881,7 @@ impl LogScanner { table_info, projected_schema, projected_row_type, - start_timestamp: None, - end_timestamp: None, + partition_name_cache: std::sync::RwLock::new(None), } } @@ -1914,9 +1900,215 @@ impl LogScanner { table_info, projected_schema, projected_row_type, - start_timestamp: None, - end_timestamp: None, + partition_name_cache: std::sync::RwLock::new(None), + } + } + + /// Get partition_id -> partition_name mapping, using cache if available + fn get_partition_name_map( + &self, + py: Python, + table_path: &fcore::metadata::TablePath, + ) -> PyResult> { + // Check cache first (read lock) + { + let cache = self.partition_name_cache.read().unwrap(); + if let Some(map) = cache.as_ref() { + return Ok(map.clone()); + } + } + + // Fetch partition infos (releases GIL during async call) + let partition_infos: Vec = py + .detach(|| { + TOKIO_RUNTIME.block_on(async { self.admin.list_partition_infos(table_path).await }) + }) + .map_err(|e| FlussError::new_err(format!("Failed to list partition infos: {e}")))?; + + // Build and cache the mapping + let map: HashMap = partition_infos + .into_iter() + .map(|info| (info.get_partition_id(), info.get_partition_name())) + .collect(); + + // Store in cache (write lock) + { + let mut cache = self.partition_name_cache.write().unwrap(); + *cache = Some(map.clone()); + } + + Ok(map) + } + + /// Query latest offsets for subscribed buckets (handles both partitioned and non-partitioned) + fn query_latest_offsets( + &self, + py: Python, + subscribed: &[(fcore::metadata::TableBucket, i64)], + ) -> PyResult> { + let inner_batch = self.inner_batch.as_ref().ok_or_else(|| { + FlussError::new_err("Batch-based scanner required for this operation") + })?; + let is_partitioned = inner_batch.is_partitioned(); + let table_path = &self.table_info.table_path; + + if !is_partitioned { + // Non-partitioned: simple case - just query all bucket IDs + let bucket_ids: Vec = subscribed.iter().map(|(tb, _)| tb.bucket_id()).collect(); + + let offsets: HashMap = py + .detach(|| { + TOKIO_RUNTIME.block_on(async { + self.admin + .list_offsets(table_path, &bucket_ids, OffsetSpec::Latest) + .await + }) + }) + .map_err(|e| FlussError::new_err(format!("Failed to list offsets: {e}")))?; + + // Convert to TableBucket-keyed map + let table_id = self.table_info.table_id; + Ok(offsets + .into_iter() + .filter(|(_, offset)| *offset > 0) + .map(|(bucket_id, offset)| { + ( + fcore::metadata::TableBucket::new(table_id, bucket_id), + offset, + ) + }) + .collect()) + } else { + // Partitioned: need to query per partition + self.query_partitioned_offsets(py, subscribed) + } + } + + /// Query offsets for partitioned table subscriptions + fn query_partitioned_offsets( + &self, + py: Python, + subscribed: &[(fcore::metadata::TableBucket, i64)], + ) -> PyResult> { + let table_path = &self.table_info.table_path; + + // Get partition_id -> partition_name mapping (cached) + let partition_id_to_name = self.get_partition_name_map(py, table_path)?; + + // Group subscribed buckets by partition_id + let mut by_partition: HashMap> = HashMap::new(); + for (tb, _) in subscribed { + if let Some(partition_id) = tb.partition_id() { + by_partition + .entry(partition_id) + .or_default() + .push(tb.bucket_id()); + } + } + + // Query offsets for each partition + let mut result: HashMap = HashMap::new(); + let table_id = self.table_info.table_id; + + for (partition_id, bucket_ids) in by_partition { + let partition_name = partition_id_to_name.get(&partition_id).ok_or_else(|| { + FlussError::new_err(format!("Unknown partition_id: {partition_id}")) + })?; + + let offsets: HashMap = py + .detach(|| { + TOKIO_RUNTIME.block_on(async { + self.admin + .list_partition_offsets( + table_path, + partition_name, + &bucket_ids, + OffsetSpec::Latest, + ) + .await + }) + }) + .map_err(|e| { + FlussError::new_err(format!( + "Failed to list offsets for partition {partition_name}: {e}" + )) + })?; + + for (bucket_id, offset) in offsets { + if offset > 0 { + let tb = fcore::metadata::TableBucket::new_with_partition( + table_id, + Some(partition_id), + bucket_id, + ); + result.insert(tb, offset); + } + } } + + Ok(result) + } + + /// Poll until all buckets reach their stopping offsets + fn poll_until_offsets( + &self, + py: Python, + mut stopping_offsets: HashMap, + ) -> PyResult> { + let inner_batch = self.inner_batch.as_ref().ok_or_else(|| { + FlussError::new_err("Batch-based scanner required for this operation") + })?; + let mut all_batches = Vec::new(); + + while !stopping_offsets.is_empty() { + let scan_batches = py + .detach(|| { + TOKIO_RUNTIME + .block_on(async { inner_batch.poll(Duration::from_millis(500)).await }) + }) + .map_err(|e| FlussError::new_err(format!("Failed to poll: {e}")))?; + + if scan_batches.is_empty() { + continue; + } + + for scan_batch in scan_batches { + let table_bucket = scan_batch.bucket().clone(); + + // Check if this bucket is still being tracked + let Some(&stop_at) = stopping_offsets.get(&table_bucket) else { + continue; + }; + + let base_offset = scan_batch.base_offset(); + let last_offset = scan_batch.last_offset(); + + // If the batch starts at or after the stop_at offset, the bucket is exhausted + if base_offset >= stop_at { + stopping_offsets.remove(&table_bucket); + continue; + } + + let batch = if last_offset >= stop_at { + // Slice batch to keep only records where offset < stop_at + let num_to_keep = (stop_at - base_offset) as usize; + let b = scan_batch.into_batch(); + let limit = num_to_keep.min(b.num_rows()); + b.slice(0, limit) + } else { + scan_batch.into_batch() + }; + + all_batches.push(Arc::new(batch)); + + // Check if we're done with this bucket + if last_offset >= stop_at - 1 { + stopping_offsets.remove(&table_bucket); + } + } + } + + Utils::combine_batches_to_table(py, all_batches) } } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index ef68fb4d..d50f19e0 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -514,6 +514,16 @@ impl RecordBatchLogScanner { .subscribe_partition(partition_id, bucket, offset) .await } + + /// Returns whether the table is partitioned + pub fn is_partitioned(&self) -> bool { + self.inner.is_partitioned_table + } + + /// Returns all subscribed buckets with their current offsets + pub fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> { + self.inner.log_scanner_status.get_all_subscriptions() + } } struct LogFetcher { @@ -1512,6 +1522,16 @@ impl LogScannerStatus { result } + /// Returns all subscribed buckets with their current offsets + pub fn get_all_subscriptions(&self) -> Vec<(TableBucket, i64)> { + let map = self.bucket_status_map.read(); + let mut result = Vec::new(); + map.for_each(|bucket, status| { + result.push((bucket.clone(), status.offset())); + }); + result + } + /// Helper to get bucket status fn get_status(&self, table_bucket: &TableBucket) -> Option> { let map = self.bucket_status_map.read();