|
18 | 18 | use crate::TOKIO_RUNTIME; |
19 | 19 | use crate::*; |
20 | 20 | use arrow::array::RecordBatch; |
21 | | -use arrow_pyarrow::FromPyArrow; |
| 21 | +use arrow_pyarrow::{FromPyArrow, ToPyArrow}; |
22 | 22 | use fluss::client::EARLIEST_OFFSET; |
| 23 | +use fluss::record::to_arrow_schema; |
23 | 24 | use fluss::rpc::message::OffsetSpec; |
24 | 25 | use pyo3_async_runtimes::tokio::future_into_py; |
25 | 26 | use std::sync::Arc; |
@@ -321,6 +322,59 @@ impl LogScanner { |
321 | 322 | Ok(df) |
322 | 323 | } |
323 | 324 |
|
| 325 | + /// Poll for new records with the specified timeout |
| 326 | + /// |
| 327 | + /// Args: |
| 328 | + /// timeout_ms: Timeout in milliseconds to wait for records |
| 329 | + /// |
| 330 | + /// Returns: |
| 331 | + /// PyArrow Table containing the polled records |
| 332 | + /// |
| 333 | + /// Note: |
| 334 | + /// - Returns an empty table (with correct schema) if no records are available |
| 335 | + /// - When timeout expires, returns an empty table (NOT an error) |
| 336 | + fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> { |
| 337 | + use std::time::Duration; |
| 338 | + |
| 339 | + if timeout_ms < 0 { |
| 340 | + return Err(FlussError::new_err(format!( |
| 341 | + "timeout_ms must be non-negative, got: {timeout_ms}" |
| 342 | + ))); |
| 343 | + } |
| 344 | + |
| 345 | + let timeout = Duration::from_millis(timeout_ms as u64); |
| 346 | + let scan_records = py |
| 347 | + .detach(|| TOKIO_RUNTIME.block_on(async { self.inner.poll(timeout).await })) |
| 348 | + .map_err(|e| FlussError::new_err(e.to_string()))?; |
| 349 | + |
| 350 | + // Convert records to Arrow batches per bucket |
| 351 | + let mut arrow_batches = Vec::new(); |
| 352 | + for (_bucket, records) in scan_records.into_records_by_buckets() { |
| 353 | + let mut batches = Utils::convert_scan_records_to_arrow(records); |
| 354 | + arrow_batches.append(&mut batches); |
| 355 | + } |
| 356 | + if arrow_batches.is_empty() { |
| 357 | + return self.create_empty_table(py); |
| 358 | + } |
| 359 | + |
| 360 | + Utils::combine_batches_to_table(py, arrow_batches) |
| 361 | + } |
| 362 | + |
| 363 | + /// Create an empty PyArrow table with the correct schema |
| 364 | + fn create_empty_table(&self, py: Python) -> PyResult<Py<PyAny>> { |
| 365 | + let arrow_schema = to_arrow_schema(self.table_info.get_row_type()); |
| 366 | + let py_schema = arrow_schema |
| 367 | + .to_pyarrow(py) |
| 368 | + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; |
| 369 | + |
| 370 | + let pyarrow = py.import("pyarrow")?; |
| 371 | + let empty_table = pyarrow |
| 372 | + .getattr("Table")? |
| 373 | + .call_method1("from_batches", (vec![] as Vec<Py<PyAny>>, py_schema))?; |
| 374 | + |
| 375 | + Ok(empty_table.into()) |
| 376 | + } |
| 377 | + |
324 | 378 | fn __repr__(&self) -> String { |
325 | 379 | format!("LogScanner(table={})", self.table_info.table_path) |
326 | 380 | } |
|
0 commit comments