Skip to content

feat: Added a batch completed callback to the data client mutations batcher#1308

Open
gkevinzheng wants to merge 1 commit intov3_stagingfrom
batcher-callback
Open

feat: Added a batch completed callback to the data client mutations batcher#1308
gkevinzheng wants to merge 1 commit intov3_stagingfrom
batcher-callback

Conversation

@gkevinzheng
Copy link
Contributor

Changes made:

  • Refactored logic from Table.mutate_rows from producing a list of Status protos from a MutationsExceptionGroup
  • Added private keyword argument for a batch completion callback in the MutationsBatcher.
  • Added unit tests/system tests.

@gkevinzheng gkevinzheng requested a review from a team as a code owner March 6, 2026 18:55
@product-auto-label product-auto-label bot added size: l Pull request size is large. api: bigtable Issues related to the googleapis/python-bigtable API. labels Mar 6, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the Bigtable data client's mutations batcher by providing a mechanism for users to register a callback function. This function will be executed upon the completion of a mutation batch, receiving a detailed list of status_pb2.Status objects that indicate the outcome of each individual mutation within that batch. The change centralizes exception processing for mutations and ensures robust handling across different client implementations.

Highlights

  • Batch Completed Callback: Introduced a private _batch_completed_callback keyword argument to the MutationsBatcher constructor, allowing a user-defined function to be invoked when a batch of mutations completes.
  • Error Handling Refactor: Refactored the logic for converting MutationsExceptionGroup into a list of status_pb2.Status objects from Table.mutate_rows into a new shared helper function _populate_statuses_from_mutations_exception_group.
  • Comprehensive Testing: Added new unit and system tests to validate the functionality of the batch completed callback and the refactored error handling logic for both asynchronous and synchronous batchers.
Changelog
  • google/cloud/bigtable/data/_async/client.py
    • Imported Callable and status_pb2 for type hinting.
    • Added _batch_completed_callback parameter to the mutations_batcher factory function.
    • Passed the new callback parameter to the MutationsBatcherAsync constructor.
  • google/cloud/bigtable/data/_async/mutations_batcher.py
    • Imported Callable, Optional, code_pb2, status_pb2, and _populate_statuses_from_mutations_exception_group.
    • Added _batch_completed_callback parameter to the __init__ method.
    • Stored the provided callback in _user_batch_completed_callback.
    • Initialized a list of status_pb2.Status objects before executing _execute_mutate_rows.
    • Called _populate_statuses_from_mutations_exception_group to populate statuses from MutationsExceptionGroup.
    • Invoked _user_batch_completed_callback with the list of statuses in the finally block of _execute_mutate_rows.
  • google/cloud/bigtable/data/_helpers.py
    • Imported MutationsExceptionGroup, code_pb2, and status_pb2.
    • Added _populate_statuses_from_mutations_exception_group function to process MutationsExceptionGroup and update a list of status_pb2.Status objects.
    • Added _get_status helper function to convert exceptions into status_pb2.Status objects.
  • google/cloud/bigtable/data/_sync_autogen/client.py
    • Imported Callable and status_pb2 for type hinting.
    • Added _batch_completed_callback parameter to the mutations_batcher factory function.
    • Passed the new callback parameter to the MutationsBatcher constructor.
  • google/cloud/bigtable/data/_sync_autogen/mutations_batcher.py
    • Imported Callable, Optional, code_pb2, status_pb2, and _populate_statuses_from_mutations_exception_group.
    • Added _batch_completed_callback parameter to the __init__ method.
    • Stored the provided callback in _user_batch_completed_callback.
    • Initialized a list of status_pb2.Status objects before executing _execute_mutate_rows.
    • Called _populate_statuses_from_mutations_exception_group to populate statuses from MutationsExceptionGroup.
    • Invoked _user_batch_completed_callback with the list of statuses in the finally block of _execute_mutate_rows.
  • google/cloud/bigtable/table.py
    • Removed unused imports GoogleAPICallError and RetryExceptionGroup.
    • Imported _populate_statuses_from_mutations_exception_group.
    • Replaced the inline logic for processing MutationsExceptionGroup with a call to the new _populate_statuses_from_mutations_exception_group helper function.
    • Removed the static method _get_status as its functionality was moved to _helpers.py.
  • tests/system/data/test_system_async.py
    • Added a new system test test_mutations_batcher_completed_callback to verify the asynchronous batcher's callback functionality.
  • tests/system/data/test_system_autogen.py
    • Added a new system test test_mutations_batcher_completed_callback to verify the synchronous batcher's callback functionality.
  • tests/unit/data/_async/test_mutations_batcher.py
    • Updated the expected argument count for batcher signature consistency checks.
    • Added unit tests for _execute_mutate_rows_batch_completed_callback to ensure the callback is invoked on success.
    • Added unit tests for _execute_mutate_rows_batch_completed_callback_errors to verify callback behavior with errors.
  • tests/unit/data/_sync_autogen/test_mutations_batcher.py
    • Updated the expected argument count for batcher signature consistency checks.
    • Added unit tests for test__execute_mutate_rows_batch_completed_callback to ensure the callback is invoked on success.
    • Added unit tests for test__execute_mutate_rows_batch_completed_callback_errors to verify callback behavior with errors.
  • tests/unit/data/test__helpers.py
    • Imported bt_exceptions, code_pb2, and status_pb2.
    • Added TestPopulateStatusesFromMutationExceptionGroup with parameterized tests to thoroughly validate the _populate_statuses_from_mutations_exception_group helper function under various exception scenarios.
Activity
  • The pull request introduces a new feature to the Bigtable data client.
  • Existing error handling logic in Table.mutate_rows has been refactored into a reusable helper function.
  • New unit and system tests have been added to ensure the correctness of the new callback functionality and the refactored code.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable feature by adding a batch completion callback to the MutationsBatcher. While the implementation is generally clean and the refactoring of exception-to-status conversion logic into _populate_statuses_from_mutations_exception_group improves modularity, a critical security vulnerability has been identified. The _populate_statuses_from_mutations_exception_group function in google/cloud/bigtable/data/_helpers.py performs unvalidated list index access using error.index, which can lead to application crashes (Denial of Service) if the index is out of bounds or None. It is recommended to add bounds and type checking for error.index before using it to access the statuses list. Additionally, a minor issue was noted in the new unit tests where assertions are written as ineffective statements.

Comment on lines +253 to +258
for error in exc_group.exceptions:
cause = error.__cause__
if isinstance(cause, RetryExceptionGroup):
statuses[error.index] = _get_status(cause.exceptions[-1])
else:
statuses[error.index] = _get_status(cause)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The helper function _populate_statuses_from_mutations_exception_group uses error.index to access the statuses list without validating that the index is an integer or within the bounds of the list. Since error.index is derived from server responses (via FailedMutationEntryError), a malicious or malfunctioning server could provide an out-of-bounds index, causing an IndexError. Additionally, some parts of the library (e.g., in MutationsBatcher) explicitly set error.index to None, which would cause a TypeError if the helper is called on such an exception. This can lead to a Denial of Service (crash) of the client application.

Suggested change
for error in exc_group.exceptions:
cause = error.__cause__
if isinstance(cause, RetryExceptionGroup):
statuses[error.index] = _get_status(cause.exceptions[-1])
else:
statuses[error.index] = _get_status(cause)
for error in exc_group.exceptions:
if isinstance(error.index, int) and 0 <= error.index < len(statuses):
cause = error.__cause__
if isinstance(cause, RetryExceptionGroup):
statuses[error.index] = _get_status(cause.exceptions[-1])
else:
statuses[error.index] = _get_status(cause)

Comment on lines +999 to +1000
kwargs["operation_timeout"] == 17
kwargs["attempt_timeout"] == 13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These lines are intended to be assertions, but they are currently ineffective statements. They should be updated to use assert to correctly validate the timeout values.

Suggested change
kwargs["operation_timeout"] == 17
kwargs["attempt_timeout"] == 13
assert kwargs["operation_timeout"] == 17
assert kwargs["attempt_timeout"] == 13

Comment on lines +878 to +879
kwargs["operation_timeout"] == 17
kwargs["attempt_timeout"] == 13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These lines appear to be intended as assertions but are currently written as ineffective statements. To ensure the timeout values are correctly validated in the test, they should be converted to assert statements.

Suggested change
kwargs["operation_timeout"] == 17
kwargs["attempt_timeout"] == 13
assert kwargs["operation_timeout"] == 17
assert kwargs["attempt_timeout"] == 13

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: bigtable Issues related to the googleapis/python-bigtable API. size: l Pull request size is large.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant