Skip to content

Commit 416a4f3

Browse files
aloivaPranava Vedagnya Gaddamhallvictoria
authored
feat: additional configurations for Kafka Trigger and Output (#306)
* add kafka configs and tests * change defaults * fix tests * add pydocs for kafka_trigger and update pydocs for kafka_output * fix code quality unit test --------- Co-authored-by: Pranava Vedagnya Gaddam <prgaddam@microsoft.com> Co-authored-by: hallvictoria <59299039+hallvictoria@users.noreply.github.com>
1 parent e3c883f commit 416a4f3

3 files changed

Lines changed: 181 additions & 25 deletions

File tree

azure/functions/decorators/function_app.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
from azure.functions.decorators.http import HttpTrigger, HttpOutput, \
3131
HttpMethod
3232
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
33-
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod
33+
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod, \
34+
KafkaMessageKeyType
3435
from azure.functions.decorators.queue import QueueTrigger, QueueOutput
3536
from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \
3637
ServiceBusQueueOutput, ServiceBusTopicTrigger, \
@@ -1244,12 +1245,19 @@ def kafka_trigger(self,
12441245
event_hub_connection_string: Optional[str] = None,
12451246
consumer_group: Optional[str] = None,
12461247
avro_schema: Optional[str] = None,
1248+
key_avro_schema: Optional[str] = None,
1249+
key_data_type: Optional[
1250+
Union[KafkaMessageKeyType, str]] = KafkaMessageKeyType.STRING,
12471251
username: Optional[str] = None,
12481252
password: Optional[str] = None,
12491253
ssl_key_location: Optional[str] = None,
12501254
ssl_ca_location: Optional[str] = None,
12511255
ssl_certificate_location: Optional[str] = None,
12521256
ssl_key_password: Optional[str] = None,
1257+
ssl_certificate_pem: Optional[str] = None,
1258+
ssl_key_pem: Optional[str] = None,
1259+
ssl_ca_pem: Optional[str] = None,
1260+
ssl_certificate_and_key_pem: Optional[str] = None,
12531261
schema_registry_url: Optional[str] = None,
12541262
schema_registry_username: Optional[str] = None,
12551263
schema_registry_password: Optional[str] = None,
@@ -1287,6 +1295,10 @@ def kafka_trigger(self,
12871295
Azure Event Hubs).
12881296
:param consumer_group: Kafka consumer group used by the trigger.
12891297
:param avro_schema: Used only if a generic Avro record should be generated.
1298+
:param key_avro_schema: Avro schema for the message key. Used only if a
1299+
generic Avro record should be generated for the key.
1300+
:param key_data_type: Data type of the message key. Valid values: Int, Long,
1301+
String, Binary. Default is String. Ignored if key_avro_schema is set.
12901302
:param username: SASL username for use with the PLAIN or SASL-SCRAM mechanisms.
12911303
Equivalent to 'sasl.username' in librdkafka. Default is empty string.
12921304
:param password: SASL password for use with the PLAIN or SASL-SCRAM mechanisms.
@@ -1297,8 +1309,16 @@ def kafka_trigger(self,
12971309
certificate. Equivalent to 'ssl.ca.location' in librdkafka.
12981310
:param ssl_certificate_location: Path to the client's certificate.
12991311
Equivalent to 'ssl.certificate.location' in librdkafka.
1300-
:param ssl_key_password: Password for the clients certificate.
1312+
:param ssl_key_password: Password for the client's certificate.
13011313
Equivalent to 'ssl.key.password' in librdkafka.
1314+
:param ssl_certificate_pem: Client certificate in PEM format.
1315+
Equivalent to 'ssl.certificate.pem' in librdkafka.
1316+
:param ssl_key_pem: Client private key in PEM format.
1317+
Equivalent to 'ssl.key.pem' in librdkafka.
1318+
:param ssl_ca_pem: CA certificate for verifying the broker's
1319+
certificate in PEM format. Equivalent to 'ssl.ca.pem' in librdkafka.
1320+
:param ssl_certificate_and_key_pem: Client certificate concatenated
1321+
with key in PEM format. Can also support KeyVault references.
13021322
:param schema_registry_url: URL of the Avro Schema Registry.
13031323
:param schema_registry_username: Username for the Schema Registry.
13041324
:param schema_registry_password: Password for the Schema Registry.
@@ -1319,6 +1339,7 @@ def kafka_trigger(self,
13191339
ScramSha256, ScramSha512. Default: Plain. Equivalent to 'sasl.mechanism'.
13201340
:param protocol: Security protocol used to communicate with brokers.
13211341
Default: plaintext. Equivalent to 'security.protocol'.
1342+
:param cardinality: Set to "many" to enable batching. Default is "One".
13221343
:param lag_threshold: Max number of unprocessed messages per worker instance.
13231344
Used in scaling logic to estimate needed worker instances. Default is 1000.
13241345
:param data_type: Defines how Functions runtime should treat the parameter value.
@@ -1338,12 +1359,19 @@ def decorator():
13381359
event_hub_connection_string=event_hub_connection_string, # noqa: E501
13391360
consumer_group=consumer_group,
13401361
avro_schema=avro_schema,
1362+
key_avro_schema=key_avro_schema,
1363+
key_data_type=parse_singular_param_to_enum(
1364+
key_data_type, KafkaMessageKeyType),
13411365
username=username,
13421366
password=password,
13431367
ssl_key_location=ssl_key_location,
13441368
ssl_ca_location=ssl_ca_location,
13451369
ssl_certificate_location=ssl_certificate_location,
13461370
ssl_key_password=ssl_key_password,
1371+
ssl_certificate_pem=ssl_certificate_pem,
1372+
ssl_key_pem=ssl_key_pem,
1373+
ssl_ca_pem=ssl_ca_pem,
1374+
ssl_certificate_and_key_pem=ssl_certificate_and_key_pem,
13471375
schema_registry_url=schema_registry_url,
13481376
schema_registry_username=schema_registry_username,
13491377
schema_registry_password=schema_registry_password,
@@ -2646,12 +2674,19 @@ def kafka_output(self,
26462674
topic: str,
26472675
broker_list: str,
26482676
avro_schema: Optional[str] = None,
2677+
key_avro_schema: Optional[str] = None,
2678+
key_data_type: Optional[
2679+
Union[KafkaMessageKeyType, str]] = KafkaMessageKeyType.STRING,
26492680
username: Optional[str] = None,
26502681
password: Optional[str] = None,
26512682
ssl_key_location: Optional[str] = None,
26522683
ssl_ca_location: Optional[str] = None,
26532684
ssl_certificate_location: Optional[str] = None,
26542685
ssl_key_password: Optional[str] = None,
2686+
ssl_certificate_pem: Optional[str] = None,
2687+
ssl_key_pem: Optional[str] = None,
2688+
ssl_ca_pem: Optional[str] = None,
2689+
ssl_certificate_and_key_pem: Optional[str] = None,
26552690
schema_registry_url: Optional[str] = None,
26562691
schema_registry_username: Optional[str] = None,
26572692
schema_registry_password: Optional[str] = None,
@@ -2688,6 +2723,10 @@ def kafka_output(self,
26882723
:param topic: The Kafka topic to which messages are published.
26892724
:param broker_list: The list of Kafka brokers to which the producer connects.
26902725
:param avro_schema: Optional. Avro schema to generate a generic record.
2726+
:param key_avro_schema: Avro schema for the message key. Used only if a
2727+
generic Avro record should be generated for the key.
2728+
:param key_data_type: Data type of the message key. Valid values: Int, Long,
2729+
String, Binary. Default is String. Ignored if key_avro_schema is set.
26912730
:param username: SASL username for use with the PLAIN and SASL-SCRAM
26922731
mechanisms. Equivalent to `'sasl.username'` in librdkafka.
26932732
:param password: SASL password for use with the PLAIN and SASL-SCRAM
@@ -2700,6 +2739,14 @@ def kafka_output(self,
27002739
Equivalent to `'ssl.certificate.location'` in librdkafka.
27012740
:param ssl_key_password: Password for the client's SSL key.
27022741
Equivalent to `'ssl.key.password'` in librdkafka.
2742+
:param ssl_certificate_pem: Client certificate in PEM format.
2743+
Equivalent to 'ssl.certificate.pem' in librdkafka.
2744+
:param ssl_key_pem: Client private key in PEM format.
2745+
Equivalent to 'ssl.key.pem' in librdkafka.
2746+
:param ssl_ca_pem: CA certificate for verifying the broker's
2747+
certificate in PEM format. Equivalent to 'ssl.ca.pem' in librdkafka.
2748+
:param ssl_certificate_and_key_pem: Client certificate concatenated
2749+
with key in PEM format. Can also support KeyVault references.
27032750
:param schema_registry_url: URL of the Avro Schema Registry.
27042751
:param schema_registry_username: Username for accessing the Schema Registry.
27052752
:param schema_registry_password: Password for accessing the Schema Registry.
@@ -2753,12 +2800,19 @@ def decorator():
27532800
topic=topic,
27542801
broker_list=broker_list,
27552802
avro_schema=avro_schema,
2803+
key_avro_schema=key_avro_schema,
2804+
key_data_type=parse_singular_param_to_enum(
2805+
key_data_type, KafkaMessageKeyType),
27562806
username=username,
27572807
password=password,
27582808
ssl_key_location=ssl_key_location,
27592809
ssl_ca_location=ssl_ca_location,
27602810
ssl_certificate_location=ssl_certificate_location,
27612811
ssl_key_password=ssl_key_password,
2812+
ssl_certificate_pem=ssl_certificate_pem,
2813+
ssl_key_pem=ssl_key_pem,
2814+
ssl_ca_pem=ssl_ca_pem,
2815+
ssl_certificate_and_key_pem=ssl_certificate_and_key_pem,
27622816
schema_registry_url=schema_registry_url,
27632817
schema_registry_username=schema_registry_username,
27642818
schema_registry_password=schema_registry_password,

azure/functions/decorators/kafka.py

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ class OAuthBearerMethod(StringifyEnum):
2929
OIDC = 1
3030

3131

32+
class KafkaMessageKeyType(StringifyEnum):
33+
INT = 0
34+
LONG = 1
35+
STRING = 2
36+
BINARY = 3
37+
38+
3239
class KafkaOutput(OutputBinding):
3340
@staticmethod
3441
def get_binding_name() -> str:
@@ -39,15 +46,21 @@ def __init__(self,
3946
topic: str,
4047
broker_list: str,
4148
avro_schema: Optional[str],
42-
username: Optional[str],
43-
password: Optional[str],
44-
ssl_key_location: Optional[str],
45-
ssl_ca_location: Optional[str],
46-
ssl_certificate_location: Optional[str],
47-
ssl_key_password: Optional[str],
48-
schema_registry_url: Optional[str],
49-
schema_registry_username: Optional[str],
50-
schema_registry_password: Optional[str],
49+
key_avro_schema: Optional[str] = None,
50+
key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING,
51+
username: Optional[str] = None,
52+
password: Optional[str] = None,
53+
ssl_key_location: Optional[str] = None,
54+
ssl_ca_location: Optional[str] = None,
55+
ssl_certificate_location: Optional[str] = None,
56+
ssl_key_password: Optional[str] = None,
57+
ssl_certificate_pem: Optional[str] = None,
58+
ssl_key_pem: Optional[str] = None,
59+
ssl_ca_pem: Optional[str] = None,
60+
ssl_certificate_and_key_pem: Optional[str] = None,
61+
schema_registry_url: Optional[str] = None,
62+
schema_registry_username: Optional[str] = None,
63+
schema_registry_password: Optional[str] = None,
5164
o_auth_bearer_method: Optional[OAuthBearerMethod] = None,
5265
o_auth_bearer_client_id: Optional[str] = None,
5366
o_auth_bearer_client_secret: Optional[str] = None,
@@ -68,12 +81,18 @@ def __init__(self,
6881
self.topic = topic
6982
self.broker_list = broker_list
7083
self.avro_schema = avro_schema
84+
self.key_avro_schema = key_avro_schema
85+
self.key_data_type = key_data_type
7186
self.username = username
7287
self.password = password
7388
self.ssl_key_location = ssl_key_location
7489
self.ssl_ca_location = ssl_ca_location
7590
self.ssl_certificate_location = ssl_certificate_location
7691
self.ssl_key_password = ssl_key_password
92+
self.ssl_certificate_pem = ssl_certificate_pem
93+
self.ssl_key_pem = ssl_key_pem
94+
self.ssl_ca_pem = ssl_ca_pem
95+
self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem
7796
self.schema_registry_url = schema_registry_url
7897
self.schema_registry_username = schema_registry_username
7998
self.schema_registry_password = schema_registry_password
@@ -104,18 +123,24 @@ def __init__(self,
104123
name: str,
105124
topic: str,
106125
broker_list: str,
107-
event_hub_connection_string: Optional[str],
108-
consumer_group: Optional[str],
109-
avro_schema: Optional[str],
110-
username: Optional[str],
111-
password: Optional[str],
112-
ssl_key_location: Optional[str],
113-
ssl_ca_location: Optional[str],
114-
ssl_certificate_location: Optional[str],
115-
ssl_key_password: Optional[str],
116-
schema_registry_url: Optional[str],
117-
schema_registry_username: Optional[str],
118-
schema_registry_password: Optional[str],
126+
event_hub_connection_string: Optional[str] = None,
127+
consumer_group: Optional[str] = None,
128+
avro_schema: Optional[str] = None,
129+
key_avro_schema: Optional[str] = None,
130+
key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING,
131+
username: Optional[str] = None,
132+
password: Optional[str] = None,
133+
ssl_key_location: Optional[str] = None,
134+
ssl_ca_location: Optional[str] = None,
135+
ssl_certificate_location: Optional[str] = None,
136+
ssl_key_password: Optional[str] = None,
137+
ssl_certificate_pem: Optional[str] = None,
138+
ssl_key_pem: Optional[str] = None,
139+
ssl_ca_pem: Optional[str] = None,
140+
ssl_certificate_and_key_pem: Optional[str] = None,
141+
schema_registry_url: Optional[str] = None,
142+
schema_registry_username: Optional[str] = None,
143+
schema_registry_password: Optional[str] = None,
119144
o_auth_bearer_method: Optional[OAuthBearerMethod] = None,
120145
o_auth_bearer_client_id: Optional[str] = None,
121146
o_auth_bearer_client_secret: Optional[str] = None,
@@ -133,12 +158,18 @@ def __init__(self,
133158
self.event_hub_connection_string = event_hub_connection_string
134159
self.consumer_group = consumer_group
135160
self.avro_schema = avro_schema
161+
self.key_avro_schema = key_avro_schema
162+
self.key_data_type = key_data_type
136163
self.username = username
137164
self.password = password
138165
self.ssl_key_location = ssl_key_location
139166
self.ssl_ca_location = ssl_ca_location
140167
self.ssl_certificate_location = ssl_certificate_location
141168
self.ssl_key_password = ssl_key_password
169+
self.ssl_certificate_pem = ssl_certificate_pem
170+
self.ssl_key_pem = ssl_key_pem
171+
self.ssl_ca_pem = ssl_ca_pem
172+
self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem
142173
self.schema_registry_url = schema_registry_url
143174
self.schema_registry_username = schema_registry_username
144175
self.schema_registry_password = schema_registry_password

0 commit comments

Comments
 (0)