diff --git a/examples/kafka-ex/README.md b/examples/kafka-ex/README.md new file mode 100644 index 00000000..1313756d --- /dev/null +++ b/examples/kafka-ex/README.md @@ -0,0 +1,58 @@ +# Kafka Example + +## Summary +This is an example on how to use the cloudevents javascript sdk with Kafka in NodeJs. + + +## Description +A simple cli application sending user input as a cloudevent message through a kafka producer to a topic. And eventually, the cloudevent message is handled and deserialized correctly by a consumer within a consumer group subscribed to the same topic. + +## Dependencies +- NodeJS (>18) +- Kafka running locally or remotely + +## Local Kafka Setup with Docker + +#### Option 1: Run Zookeeper and Kafka Dccker Images sequentially with these commands + +```bash +docker run -d \ + --name zookeeper \ + -e ZOOKEEPER_CLIENT_PORT=2181 \ + -e ZOOKEEPER_TICK_TIME=2000 \ + confluentinc/cp-zookeeper:7.3.2 + +``` +```bash +docker run -d \ + --name kafka \ + -p 9092:9092 \ + -e KAFKA_BROKER_ID=1 \ + -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 \ + -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ + -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ + -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \ + --link zookeeper:zookeeper \ + confluentinc/cp-kafka:7.3.2 + +``` + +#### Option 2: Run both images using the docker compose file + +```bash + cd ${directory of the docker compose file} + + docker compose up -d +``` + +## Then, run the producer (cli) and consumer + +#### To Start the Producer +```bash +npm run start:producer +``` + +#### To Start the Consumer +```bash +npm run start:consumer ${groupId} +``` diff --git a/examples/kafka-ex/docker-compose.yml b/examples/kafka-ex/docker-compose.yml new file mode 100644 index 00000000..8cd19562 --- /dev/null +++ b/examples/kafka-ex/docker-compose.yml @@ -0,0 +1,23 @@ +--- +version: '3' +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.3.2 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.3.2 + container_name: kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 \ No newline at end of file diff --git a/examples/kafka-ex/package.json b/examples/kafka-ex/package.json new file mode 100644 index 00000000..b4b837c9 --- /dev/null +++ b/examples/kafka-ex/package.json @@ -0,0 +1,24 @@ +{ + "name": "kafka-ex", + "version": "1.0.0", + "description": "kafka example using CloudEvents", + "repository": "https://github.com/cloudevents/sdk-javascript.git", + "scripts": { + "build": "tsc", + "start:producer": "ts-node src/producer.ts", + "start:consumer": "ts-node src/consumer.ts" + }, + "keywords": [], + "author": "", + "license": "ISC", + "type": "commonjs", + "dependencies": { + "cloudevents": "^10.0.0", + "kafkajs": "^2.2.4", + "ts-node": "^10.9.2" + }, + "devDependencies": { + "@types/node": "^22.13.2", + "typescript": "^5.7.3" + } +} diff --git a/examples/kafka-ex/src/admin.ts b/examples/kafka-ex/src/admin.ts new file mode 100644 index 00000000..73788dea --- /dev/null +++ b/examples/kafka-ex/src/admin.ts @@ -0,0 +1,16 @@ +/* eslint-disable */ +import kafka from "./client"; + +(async () => { + const admin = kafka.admin(); + await admin.connect(); + await admin.createTopics({ + topics: [ + { + topic: "events.cloudevents.test", + numPartitions: 2, + }, + ], + }); + await admin.disconnect(); +})(); diff --git a/examples/kafka-ex/src/client.ts b/examples/kafka-ex/src/client.ts new file mode 100644 index 00000000..1ac9db5a --- /dev/null +++ b/examples/kafka-ex/src/client.ts @@ -0,0 +1,9 @@ +/* eslint-disable */ +import { Kafka } from "kafkajs"; + +const kafka = new Kafka({ + clientId: 'kafka-ex-client-id', + brokers: ['localhost:9092'], +}); + +export default kafka; \ No newline at end of file diff --git a/examples/kafka-ex/src/consumer.ts b/examples/kafka-ex/src/consumer.ts new file mode 100644 index 00000000..043b31d6 --- /dev/null +++ b/examples/kafka-ex/src/consumer.ts @@ -0,0 +1,47 @@ +/* eslint-disable */ + +import { Headers, Kafka, Message } from "cloudevents"; +import kafka from "./client"; + +const groupId = process.argv[2]; + +(async () => { + const consumer = kafka.consumer({ groupId }); + await consumer.connect(); + + consumer.subscribe({ topic: "events.cloudevents.test" }); + + consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + console.log("Raw Kafka message:", { + topic, + partition, + offset: message.offset, + headers: message.headers, + value: message.value?.toString(), + }); + + try { + const newHeaders: Headers = {}; + Object.keys(message.headers as Headers).forEach((key) => { + // this is needed here because the headers are buffer values + // when it gets to the consumer and the buffer headers are not valid for the + // toEvent api from cloudevents, so this converts each key value to a string + // as expected by the toEvent api + newHeaders[key] = message!.headers![key]?.toString() ?? ""; + }); + + message.headers = newHeaders; + const messageValue = Kafka.toEvent( + message as unknown as Message + ); + + console.log("Deserialized CloudEvent:", messageValue); + // message is automatically acknowledged when the callback is finished + } catch (error) { + console.error("Error deserializing CloudEvent:", error); + console.log("Raw message value:", message.value?.toString()); + } + }, + }); +})(); diff --git a/examples/kafka-ex/src/producer.ts b/examples/kafka-ex/src/producer.ts new file mode 100644 index 00000000..e7ae18ba --- /dev/null +++ b/examples/kafka-ex/src/producer.ts @@ -0,0 +1,41 @@ +/* eslint-disable */ + +import { CloudEvent, Kafka } from "cloudevents"; +import readline from "readline"; +import kafka from "./client"; + +const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, +}); + +(async () => { + const producer = kafka.producer(); + await producer.connect(); + + rl.setPrompt("Enter message > "); + rl.prompt(); + rl.on("line", async (line) => { + const event = new CloudEvent({ + source: "cloudevents-producer", + type: "events.cloudevents.test", + datacontenttype: "text/plain", + partitionkey: "1", + data: line, + }); + + const message = Kafka.structured(event); + + console.log("Sending CloudEvent:", message); + + await producer.send({ + topic: "events.cloudevents.test", + messages: [message], + }); + rl.prompt(); + }); + + rl.on("close", async () => { + await producer.disconnect(); + }); +})(); diff --git a/examples/kafka-ex/tsconfig.json b/examples/kafka-ex/tsconfig.json new file mode 100644 index 00000000..9065c97c --- /dev/null +++ b/examples/kafka-ex/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "allowJs": true, + "checkJs": false, + "strict": true, + "noImplicitAny": true, + "moduleResolution": "node", + "esModuleInterop": true + }, + "include": [ + "src/**/*.ts", + "src/**/*.js", + ], + "exclude": ["node_modules"] +}