Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 182 additions & 20 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ async def main():
except Exception as e:
print(f"Failed to get table info: {e}")

# Demo: List offsets
print("\n--- Testing list_offsets() ---")
try:
# Query latest offsets using OffsetType constant (recommended for type safety)
offsets = await admin.list_offsets(
table_path,
bucket_ids=[0],
offset_type=fluss.OffsetType.LATEST
)
print(f"Latest offsets for table (before writes): {offsets}")
except Exception as e:
print(f"Failed to list offsets: {e}")

# Get the table instance
table = await conn.get_table(table_path)
print(f"Got table: {table}")
Expand All @@ -96,7 +109,7 @@ async def main():
print(f"Created append writer: {append_writer}")

try:
# Test 1: Write PyArrow Table
# Demo: Write PyArrow Table
print("\n--- Testing PyArrow Table write ---")
pa_table = pa.Table.from_arrays(
[
Expand Down Expand Up @@ -139,7 +152,7 @@ async def main():
append_writer.write_arrow(pa_table)
print("Successfully wrote PyArrow Table")

# Test 2: Write PyArrow RecordBatch
# Demo: Write PyArrow RecordBatch
print("\n--- Testing PyArrow RecordBatch write ---")
pa_record_batch = pa.RecordBatch.from_arrays(
[
Expand Down Expand Up @@ -202,7 +215,7 @@ async def main():
)
print("Successfully appended row (list with Date, Time, Timestamp, Decimal)")

# Test 4: Write Pandas DataFrame
# Demo: Write Pandas DataFrame
print("\n--- Testing Pandas DataFrame write ---")
df = pd.DataFrame(
{
Expand Down Expand Up @@ -232,6 +245,19 @@ async def main():
append_writer.flush()
print("Successfully flushed data")

# Demo: Check offsets after writes
print("\n--- Checking offsets after writes ---")
try:
# Query with string constant (alternative API - both strings and constants are supported)
offsets = await admin.list_offsets(
table_path,
bucket_ids=[0],
offset_type="latest" # Can also use "earliest" or "timestamp"
)
print(f"Latest offsets after writing 7 records: {offsets}")
except Exception as e:
print(f"Failed to list offsets: {e}")

except Exception as e:
print(f"Error during writing: {e}")

Expand All @@ -242,10 +268,13 @@ async def main():
batch_scanner = await table.new_scan().create_batch_scanner()
print(f"Created batch scanner: {batch_scanner}")

# Subscribe to scan from earliest to latest
# start_timestamp=None (earliest), end_timestamp=None (latest)
batch_scanner.subscribe(None, None)
# Subscribe to buckets (required before to_arrow/to_pandas)
# Use subscribe_buckets to subscribe all buckets from EARLIEST_OFFSET
num_buckets = (await admin.get_table(table_path)).num_buckets
batch_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
print(f"Subscribed to {num_buckets} buckets from EARLIEST_OFFSET")

# Read all data using to_arrow()
print("Scanning results using to_arrow():")

# Try to get as PyArrow Table
Expand All @@ -255,13 +284,13 @@ async def main():
except Exception as e:
print(f"Could not convert to PyArrow: {e}")

# Let's subscribe from the beginning again.
# Reset subscription
batch_scanner.subscribe(None, None)
# Create a new batch scanner for to_pandas() test
batch_scanner2 = await table.new_scan().create_batch_scanner()
batch_scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})

# Try to get as Pandas DataFrame
try:
df_result = batch_scanner.to_pandas()
df_result = batch_scanner2.to_pandas()
print(f"\nAs Pandas DataFrame:\n{df_result}")
except Exception as e:
print(f"Could not convert to Pandas: {e}")
Expand All @@ -273,32 +302,34 @@ async def main():

# Test poll_arrow() method for incremental reading as Arrow Table
print("\n--- Testing poll_arrow() method ---")
# Reset subscription to start from the beginning
batch_scanner.subscribe(None, None)
batch_scanner3 = await table.new_scan().create_batch_scanner()
batch_scanner3.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET)
print(f"Subscribed to bucket 0 at EARLIEST_OFFSET ({fluss.EARLIEST_OFFSET})")

# Poll with a timeout of 5000ms (5 seconds)
# Note: poll_arrow() returns an empty table (not an error) on timeout
try:
poll_result = batch_scanner.poll_arrow(5000)
poll_result = batch_scanner3.poll_arrow(5000)
print(f"Number of rows: {poll_result.num_rows}")

if poll_result.num_rows > 0:
poll_df = poll_result.to_pandas()
print(f"Polled data:\n{poll_df}")
else:
print("Empty result (no records available)")
# Empty table still has schema
# Empty table still has schema - this is useful!
print(f"Schema: {poll_result.schema}")

except Exception as e:
print(f"Error during poll_arrow: {e}")

# Test poll_batches() method for batches with metadata
print("\n--- Testing poll_batches() method ---")
batch_scanner.subscribe(None, None)
batch_scanner4 = await table.new_scan().create_batch_scanner()
batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})

try:
batches = batch_scanner.poll_batches(5000)
batches = batch_scanner4.poll_batches(5000)
print(f"Number of batches: {len(batches)}")

for i, batch in enumerate(batches):
Expand All @@ -319,7 +350,7 @@ async def main():
record_scanner = await table.new_scan().create_log_scanner()
print(f"Created record scanner: {record_scanner}")

record_scanner.subscribe(None, None)
record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})

# Poll returns List[ScanRecord] with per-record metadata
print("\n--- Testing poll() method (record-by-record) ---")
Expand Down Expand Up @@ -539,10 +570,13 @@ async def main():
# Demo: Column projection using builder pattern
print("\n--- Testing Column Projection ---")
try:
# Get bucket count for subscriptions
num_buckets = (await admin.get_table(table_path)).num_buckets

# Project specific columns by index (using batch scanner for to_pandas)
print("\n1. Projection by index [0, 1] (id, name):")
scanner_index = await table.new_scan().project([0, 1]).create_batch_scanner()
scanner_index.subscribe(None, None)
scanner_index.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
df_projected = scanner_index.to_pandas()
print(df_projected.head())
print(
Expand All @@ -554,22 +588,150 @@ async def main():
scanner_names = await table.new_scan() \
.project_by_name(["name", "score"]) \
.create_batch_scanner()
scanner_names.subscribe(None, None)
scanner_names.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
df_named = scanner_names.to_pandas()
print(df_named.head())
print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}")

# Test empty result schema with projection
print("\n3. Testing empty result schema with projection:")
scanner_proj = await table.new_scan().project([0, 2]).create_batch_scanner()
scanner_proj.subscribe(None, None)
scanner_proj.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
# Quick poll that may return empty
result = scanner_proj.poll_arrow(100)
print(f" Schema columns: {result.schema.names}")

except Exception as e:
print(f"Error during projection: {e}")

# Demo: Drop tables
print("\n--- Testing drop_table() ---")
try:
# Drop the log table
await admin.drop_table(table_path, ignore_if_not_exists=True)
print(f"Successfully dropped table: {table_path}")
# Drop the PK table
await admin.drop_table(pk_table_path, ignore_if_not_exists=True)
print(f"Successfully dropped table: {pk_table_path}")
except Exception as e:
print(f"Failed to drop table: {e}")

# =====================================================
# Demo: Partitioned Table with list_partition_offsets
# =====================================================
print("\n" + "=" * 60)
print("--- Testing Partitioned Table ---")
print("=" * 60)

# Create a partitioned log table
partitioned_fields = [
pa.field("id", pa.int32()),
pa.field("region", pa.string()), # partition key
pa.field("value", pa.int64()),
]
partitioned_schema = pa.schema(partitioned_fields)
fluss_partitioned_schema = fluss.Schema(partitioned_schema)

partitioned_table_descriptor = fluss.TableDescriptor(
fluss_partitioned_schema,
partition_keys=["region"], # Partition by region
bucket_count=1,
)

partitioned_table_path = fluss.TablePath("fluss", "partitioned_log_table_py")

try:
# Drop if exists first
await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True)
print(f"Dropped existing table: {partitioned_table_path}")

# Create the partitioned table
await admin.create_table(partitioned_table_path, partitioned_table_descriptor, False)
print(f"Created partitioned table: {partitioned_table_path}")

# Create partitions for US and EU regions
print("\n--- Creating partitions ---")
await admin.create_partition(partitioned_table_path, {"region": "US"}, ignore_if_exists=True)
print("Created partition: region=US")
await admin.create_partition(partitioned_table_path, {"region": "EU"}, ignore_if_exists=True)
print("Created partition: region=EU")

# List partitions
print("\n--- Listing partitions ---")
partition_infos = await admin.list_partition_infos(partitioned_table_path)
for p in partition_infos:
print(f" {p}") # PartitionInfo(partition_id=..., partition_name='region=...')

# Get the table and write some data
partitioned_table = await conn.get_table(partitioned_table_path)
partitioned_writer = await partitioned_table.new_append_writer()

# Append data to US partition
await partitioned_writer.append({"id": 1, "region": "US", "value": 100})
await partitioned_writer.append({"id": 2, "region": "US", "value": 200})
# Append data to EU partition
await partitioned_writer.append({"id": 3, "region": "EU", "value": 300})
await partitioned_writer.append({"id": 4, "region": "EU", "value": 400})
partitioned_writer.flush()
print("\nWrote 4 records (2 to US, 2 to EU)")

# Demo: list_partition_offsets
print("\n--- Testing list_partition_offsets ---")

# Query offsets for US partition
# Note: partition_name is just the value (e.g., "US"), not "region=US"
us_offsets = await admin.list_partition_offsets(
partitioned_table_path,
partition_name="US",
bucket_ids=[0],
offset_type="latest"
)
print(f"US partition latest offsets: {us_offsets}")

# Query offsets for EU partition
eu_offsets = await admin.list_partition_offsets(
partitioned_table_path,
partition_name="EU",
bucket_ids=[0],
offset_type="latest"
)
print(f"EU partition latest offsets: {eu_offsets}")

# Demo: subscribe_partition for reading partitioned data
print("\n--- Testing subscribe_partition + to_arrow() ---")
partitioned_scanner = await partitioned_table.new_scan().create_batch_scanner()

# Subscribe to each partition using partition_id
for p in partition_infos:
partitioned_scanner.subscribe_partition(
partition_id=p.partition_id,
bucket_id=0,
start_offset=fluss.EARLIEST_OFFSET
)
print(f"Subscribed to partition {p.partition_name} (id={p.partition_id})")

# Use to_arrow() - now works for partitioned tables!
partitioned_arrow = partitioned_scanner.to_arrow()
print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:")
print(partitioned_arrow.to_pandas())

# Demo: to_pandas() also works for partitioned tables
print("\n--- Testing to_pandas() on partitioned table ---")
partitioned_scanner2 = await partitioned_table.new_scan().create_batch_scanner()
for p in partition_infos:
partitioned_scanner2.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET)
partitioned_df = partitioned_scanner2.to_pandas()
print(f"to_pandas() returned {len(partitioned_df)} records:")
print(partitioned_df)

# Cleanup
await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True)
print(f"\nDropped partitioned table: {partitioned_table_path}")

except Exception as e:
print(f"Error with partitioned table: {e}")
traceback.print_exc()

# Close connection
conn.close()
print("\nConnection closed")
Expand Down
Loading
Loading