Skip to content

Commit 2711330

Browse files
committed
feat: support conversion of BigQuery schema to Protobuf
1 parent 3928ddc commit 2711330

3 files changed

Lines changed: 723 additions & 1 deletion

File tree

packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
# this code path once we drop support for Python 3.7
2929
import importlib_metadata as metadata
3030

31-
from google.cloud.bigquery_storage_v1 import client, types
31+
from google.cloud.bigquery_storage_v1 import client, schema, types
3232

3333

3434
class BigQueryReadClient(client.BigQueryReadClient):
@@ -140,4 +140,6 @@ def _get_version(dependency_name):
140140
# google.cloud.bigquery_storage_v1.client
141141
"BigQueryReadClient",
142142
"BigQueryWriteClient",
143+
# google.cloud.bigquery_storage_v1.schema
144+
"schema",
143145
)
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Utilities for converting BigQuery schemas to Protocol Buffer descriptors.
16+
17+
This module provides functionality to dynamically generate Protocol Buffer
18+
descriptors from BigQuery table schemas, eliminating the need to manually
19+
create and compile .proto files when using the BigQuery Storage Write API.
20+
"""
21+
22+
from typing import Dict, List, Optional
23+
24+
from google.cloud.bigquery_storage_v1 import types
25+
from google.protobuf import descriptor_pb2
26+
27+
28+
# Mapping from BigQuery types to Protocol Buffer field types
29+
_BQ_TO_PROTO_TYPE_MAP: Dict[types.TableFieldSchema.Type, int] = {
30+
types.TableFieldSchema.Type.STRING: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
31+
types.TableFieldSchema.Type.INT64: descriptor_pb2.FieldDescriptorProto.TYPE_INT64,
32+
types.TableFieldSchema.Type.BOOL: descriptor_pb2.FieldDescriptorProto.TYPE_BOOL,
33+
types.TableFieldSchema.Type.BYTES: descriptor_pb2.FieldDescriptorProto.TYPE_BYTES,
34+
types.TableFieldSchema.Type.DOUBLE: descriptor_pb2.FieldDescriptorProto.TYPE_DOUBLE,
35+
# DATE is represented as days since epoch
36+
types.TableFieldSchema.Type.DATE: descriptor_pb2.FieldDescriptorProto.TYPE_INT32,
37+
# DATETIME is represented as a formatted string
38+
types.TableFieldSchema.Type.DATETIME: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
39+
# TIME is represented as a formatted string
40+
types.TableFieldSchema.Type.TIME: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
41+
# TIMESTAMP is represented as microseconds since epoch
42+
types.TableFieldSchema.Type.TIMESTAMP: descriptor_pb2.FieldDescriptorProto.TYPE_INT64,
43+
# NUMERIC and BIGNUMERIC are represented as strings
44+
types.TableFieldSchema.Type.NUMERIC: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
45+
types.TableFieldSchema.Type.BIGNUMERIC: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
46+
# GEOGRAPHY is represented as WKT string
47+
types.TableFieldSchema.Type.GEOGRAPHY: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
48+
# JSON is represented as a string
49+
types.TableFieldSchema.Type.JSON: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
50+
# INTERVAL is represented as a string
51+
types.TableFieldSchema.Type.INTERVAL: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
52+
# STRUCT and RANGE use TYPE_MESSAGE and are handled specially
53+
types.TableFieldSchema.Type.STRUCT: descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE,
54+
types.TableFieldSchema.Type.RANGE: descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE,
55+
}
56+
57+
58+
def _get_field_label(mode: types.TableFieldSchema.Mode) -> int:
59+
"""Convert BigQuery field mode to Protocol Buffer field label.
60+
61+
Args:
62+
mode: The BigQuery field mode (NULLABLE, REQUIRED, or REPEATED).
63+
64+
Returns:
65+
The corresponding Protocol Buffer field label constant.
66+
"""
67+
if mode == types.TableFieldSchema.Mode.REQUIRED:
68+
return descriptor_pb2.FieldDescriptorProto.LABEL_REQUIRED
69+
elif mode == types.TableFieldSchema.Mode.REPEATED:
70+
return descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED
71+
else: # NULLABLE or MODE_UNSPECIFIED
72+
return descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
73+
74+
75+
def _create_range_descriptor(
76+
field_name: str,
77+
element_type: types.TableFieldSchema.Type,
78+
) -> descriptor_pb2.DescriptorProto:
79+
"""Create a nested message descriptor for a RANGE field.
80+
81+
Args:
82+
field_name: The name of the RANGE field.
83+
element_type: The element type of the RANGE (e.g., DATE, DATETIME, TIMESTAMP).
84+
85+
Returns:
86+
A DescriptorProto representing the RANGE structure with start and end fields.
87+
"""
88+
range_descriptor = descriptor_pb2.DescriptorProto()
89+
range_descriptor.name = f"{field_name}_Range"
90+
91+
# Get the proto type for the element
92+
element_proto_type = _BQ_TO_PROTO_TYPE_MAP.get(
93+
element_type,
94+
descriptor_pb2.FieldDescriptorProto.TYPE_STRING
95+
)
96+
97+
# Add 'start' field
98+
start_field = range_descriptor.field.add()
99+
start_field.name = "start"
100+
start_field.number = 1
101+
start_field.type = element_proto_type
102+
start_field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
103+
104+
# Add 'end' field
105+
end_field = range_descriptor.field.add()
106+
end_field.name = "end"
107+
end_field.number = 2
108+
end_field.type = element_proto_type
109+
end_field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
110+
111+
return range_descriptor
112+
113+
114+
def _convert_fields_to_proto(
115+
fields: List[types.TableFieldSchema],
116+
parent_name: str = "",
117+
) -> tuple[List[descriptor_pb2.FieldDescriptorProto], List[descriptor_pb2.DescriptorProto]]:
118+
"""Convert BigQuery fields to Protocol Buffer field descriptors.
119+
120+
Args:
121+
fields: List of BigQuery table field schemas.
122+
parent_name: Optional parent message name for nested types.
123+
124+
Returns:
125+
A tuple of (field_descriptors, nested_descriptors):
126+
- field_descriptors: List of FieldDescriptorProto objects
127+
- nested_descriptors: List of nested DescriptorProto objects for STRUCT/RANGE types
128+
"""
129+
field_descriptors = []
130+
nested_descriptors = []
131+
132+
for field_number, bq_field in enumerate(fields, start=1):
133+
field_descriptor = descriptor_pb2.FieldDescriptorProto()
134+
field_descriptor.name = bq_field.name
135+
field_descriptor.number = field_number
136+
137+
# Handle STRUCT fields (nested messages)
138+
if bq_field.type_ == types.TableFieldSchema.Type.STRUCT:
139+
field_descriptor.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
140+
141+
# Create nested message descriptor
142+
nested_descriptor = descriptor_pb2.DescriptorProto()
143+
nested_descriptor.name = f"{bq_field.name}_Struct"
144+
145+
# Recursively convert nested fields
146+
nested_fields, deeply_nested = _convert_fields_to_proto(
147+
bq_field.fields,
148+
parent_name=nested_descriptor.name
149+
)
150+
151+
for nested_field in nested_fields:
152+
nested_descriptor.field.append(nested_field)
153+
154+
for deeply_nested_type in deeply_nested:
155+
nested_descriptor.nested_type.append(deeply_nested_type)
156+
157+
nested_descriptors.append(nested_descriptor)
158+
field_descriptor.type_name = nested_descriptor.name
159+
160+
# Handle RANGE fields
161+
elif bq_field.type_ == types.TableFieldSchema.Type.RANGE:
162+
field_descriptor.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
163+
164+
# Get the element type from range_element_type
165+
if not bq_field.range_element_type or not bq_field.range_element_type.type_:
166+
raise ValueError(
167+
f"RANGE field '{bq_field.name}' is missing range_element_type. "
168+
f"RANGE fields must specify an element type (DATE, DATETIME, or TIMESTAMP)."
169+
)
170+
element_type = bq_field.range_element_type.type_
171+
172+
range_descriptor = _create_range_descriptor(bq_field.name, element_type)
173+
nested_descriptors.append(range_descriptor)
174+
field_descriptor.type_name = range_descriptor.name
175+
176+
# Handle primitive types
177+
else:
178+
proto_type = _BQ_TO_PROTO_TYPE_MAP.get(bq_field.type_)
179+
if proto_type is None:
180+
raise ValueError(
181+
f"Unsupported BigQuery type: {bq_field.type_} for field {bq_field.name}"
182+
)
183+
field_descriptor.type = proto_type
184+
185+
# Set field label based on mode
186+
mode = bq_field.mode or types.TableFieldSchema.Mode.NULLABLE
187+
field_descriptor.label = _get_field_label(mode)
188+
189+
field_descriptors.append(field_descriptor)
190+
191+
return field_descriptors, nested_descriptors
192+
193+
194+
def table_schema_to_proto_descriptor(
195+
table_schema: types.TableSchema,
196+
message_name: str = "TableRow",
197+
) -> descriptor_pb2.DescriptorProto:
198+
"""Convert a BigQuery TableSchema to a Protocol Buffer DescriptorProto.
199+
200+
This function generates a Protocol Buffer descriptor that can be used with
201+
the BigQuery Storage Write API without needing to create and compile .proto
202+
files. The generated descriptor uses proto2 wire format, which is required
203+
by the Write API.
204+
205+
Args:
206+
table_schema: The BigQuery table schema to convert.
207+
message_name: Optional name for the root message type. Defaults to "TableRow".
208+
209+
Returns:
210+
A DescriptorProto that can be used with ProtoSchema in the Write API.
211+
212+
Raises:
213+
ValueError: If the schema contains unsupported field types.
214+
215+
Example:
216+
>>> from google.cloud.bigquery_storage_v1 import schema, types
217+
>>>
218+
>>> # Define a BigQuery schema
219+
>>> table_schema = types.TableSchema(fields=[
220+
... types.TableFieldSchema(
221+
... name="id",
222+
... type_=types.TableFieldSchema.Type.INT64,
223+
... mode=types.TableFieldSchema.Mode.REQUIRED
224+
... ),
225+
... types.TableFieldSchema(
226+
... name="name",
227+
... type_=types.TableFieldSchema.Type.STRING
228+
... ),
229+
... ])
230+
>>>
231+
>>> # Convert to proto descriptor
232+
>>> proto_descriptor = schema.table_schema_to_proto_descriptor(table_schema)
233+
>>>
234+
>>> # Use with Write API
235+
>>> proto_schema = types.ProtoSchema()
236+
>>> proto_schema.proto_descriptor = proto_descriptor
237+
238+
Note:
239+
For detailed information about BigQuery to Protocol Buffer type mappings,
240+
see: https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
241+
"""
242+
# Create the root descriptor
243+
descriptor = descriptor_pb2.DescriptorProto()
244+
descriptor.name = message_name
245+
246+
# Convert fields
247+
field_descriptors, nested_descriptors = _convert_fields_to_proto(table_schema.fields)
248+
249+
# Add field descriptors to the message
250+
for field_descriptor in field_descriptors:
251+
descriptor.field.append(field_descriptor)
252+
253+
# Add nested type descriptors
254+
for nested_descriptor in nested_descriptors:
255+
descriptor.nested_type.append(nested_descriptor)
256+
257+
return descriptor
258+
259+
260+
__all__ = ("table_schema_to_proto_descriptor",)

0 commit comments

Comments
 (0)