diff --git a/examples/pom.xml b/examples/pom.xml
index 63d37278..31cca1f4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -381,6 +381,28 @@
+
+ on-success-sink
+ package
+
+ dockerBuild
+
+
+
+ amazoncorretto:11
+
+
+
+ io.numaproj.numaflow.examples.sink.onsuccess.OnSuccess
+
+
+
+
+ numaflow-java-examples/on-success-sink:${docker.tag}
+
+
+
+
diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java
new file mode 100644
index 00000000..b24342d4
--- /dev/null
+++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java
@@ -0,0 +1,75 @@
+package io.numaproj.numaflow.examples.sink.onsuccess;
+
+import io.numaproj.numaflow.examples.sink.simple.SimpleSink;
+import io.numaproj.numaflow.sinker.Datum;
+import io.numaproj.numaflow.sinker.DatumIterator;
+import io.numaproj.numaflow.sinker.Message;
+import io.numaproj.numaflow.sinker.Response;
+import io.numaproj.numaflow.sinker.ResponseList;
+import io.numaproj.numaflow.sinker.Server;
+import io.numaproj.numaflow.sinker.Sinker;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Random;
+
+@Slf4j
+public class OnSuccess extends Sinker {
+ public static void main(String[] args) throws Exception {
+ Server server = new Server(new OnSuccess());
+
+ // Start the server
+ server.start();
+
+ // wait for the server to shut down
+ server.awaitTermination();
+ }
+
+ @Override
+ public ResponseList processMessages(DatumIterator datumIterator) {
+ ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
+ while (true) {
+ Datum datum = null;
+ try {
+ datum = datumIterator.next();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ // null means the iterator is closed, so we break the loop
+ if (datum == null) {
+ break;
+ }
+ try {
+ String msg = new String(datum.getValue());
+ log.info("Received message: {}, id: {}, headers - {}", msg, datum.getId(), datum.getHeaders());
+ if (writeToPrimarySink()) {
+ log.info("Writing to onSuccess sink: {}", datum.getId());
+ responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(),
+ Message.builder()
+ .value(String.format("Successfully wrote message with ID: %s",
+ datum.getId()).getBytes())
+ .build()));
+ } else {
+ log.info("Writing to fallback sink: {}", datum.getId());
+ responseListBuilder.addResponse(Response.responseFallback(datum.getId()));
+ }
+ } catch (Exception e) {
+ log.warn("Error while writing to any sink: ", e);
+ responseListBuilder.addResponse(Response.responseFailure(
+ datum.getId(),
+ e.getMessage()));
+ }
+ }
+ return responseListBuilder.build();
+ }
+
+ /**
+ * Example method to simulate write failures/success to primary sink.
+ * Based on whether this returns true/false, we write to fallback sink / onSuccess sink
+ * @return true if simulated write to primary sink is successful, false otherwise
+ */
+ public boolean writeToPrimarySink() {
+ Random random = new Random();
+ return random.nextBoolean();
+ }
+}
diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccessTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccessTest.java
new file mode 100644
index 00000000..3dba7007
--- /dev/null
+++ b/examples/src/test/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccessTest.java
@@ -0,0 +1,33 @@
+package io.numaproj.numaflow.examples.sink.onsuccess;
+
+import io.numaproj.numaflow.examples.sink.simple.SimpleSink;
+import io.numaproj.numaflow.sinker.Response;
+import io.numaproj.numaflow.sinker.ResponseList;
+import io.numaproj.numaflow.sinker.SinkerTestKit;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class OnSuccessTest {
+ @Test
+ public void testOnSuccessSink() {
+ int datumCount = 10;
+ OnSuccess onSuccessSink = new OnSuccess();
+ // Create a test datum iterator with 10 messages
+ SinkerTestKit.TestListIterator testListIterator = new SinkerTestKit.TestListIterator();
+ for (int i = 0; i < datumCount; i++) {
+ testListIterator.addDatum(
+ SinkerTestKit.TestDatum
+ .builder()
+ .id("id-" + i)
+ .value(("value-" + i).getBytes())
+ .build());
+ }
+ ResponseList responseList = onSuccessSink.processMessages(testListIterator);
+ Assertions.assertEquals(datumCount, responseList.getResponses().size());
+ for (Response response : responseList.getResponses()) {
+ Assertions.assertEquals(false, response.getSuccess());
+ }
+ // we can add the logic to verify if the messages were
+ // successfully written to the sink(could be a file, database, etc.)
+ }
+}
diff --git a/pom.xml b/pom.xml
index 2a0b3039..db18e1c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -310,6 +310,7 @@
io/numaproj/numaflow/source/v1/*
io/numaproj/numaflow/serving/v1/*
io/numaproj/numaflow/accumulator/v1/*
+ common/*
**/*TestKit*
diff --git a/src/main/java/io/numaproj/numaflow/sinker/Constants.java b/src/main/java/io/numaproj/numaflow/sinker/Constants.java
index c7d8ae7c..cb5a64ac 100644
--- a/src/main/java/io/numaproj/numaflow/sinker/Constants.java
+++ b/src/main/java/io/numaproj/numaflow/sinker/Constants.java
@@ -3,14 +3,18 @@
class Constants {
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sink.sock";
public static final String DEFAULT_FB_SINK_SOCKET_PATH = "/var/run/numaflow/fb-sink.sock";
+ public static final String DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH = "/var/run/numaflow/ons-sink.sock";
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sinker-server-info";
public static final String DEFAULT_FB_SERVER_INFO_FILE_PATH =
"/var/run/numaflow/fb-sinker-server-info";
+ public static final String DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH =
+ "/var/run/numaflow/ons-sinker-server-info";
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
public static final int DEFAULT_PORT = 50051;
public static final String DEFAULT_HOST = "localhost";
public static final String ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE";
public static final String UD_CONTAINER_FALLBACK_SINK = "fb-udsink";
+ public static final String UD_CONTAINER_ON_SUCCESS_SINK = "ons-udsink";
// Private constructor to prevent instantiation
private Constants() {
diff --git a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java
index 46f6e248..5834ff42 100644
--- a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java
@@ -36,6 +36,9 @@ static GRPCConfig defaultGrpcConfig() {
if (Constants.UD_CONTAINER_FALLBACK_SINK.equals(containerType)) {
socketPath = Constants.DEFAULT_FB_SINK_SOCKET_PATH;
infoFilePath = Constants.DEFAULT_FB_SERVER_INFO_FILE_PATH;
+ } else if (Constants.UD_CONTAINER_ON_SUCCESS_SINK.equals(containerType)) {
+ socketPath = Constants.DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH;
+ infoFilePath = Constants.DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH;
}
return GRPCConfig.newBuilder()
.infoFilePath(infoFilePath)
diff --git a/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java b/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java
new file mode 100644
index 00000000..abc0db08
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java
@@ -0,0 +1,16 @@
+package io.numaproj.numaflow.sinker;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.util.HashMap;
+
+/**
+ * KeyValueGroup is a map of key-value pairs for a given group.
+ * Used as part of {@link io.numaproj.numaflow.sinker.Message}.
+ */
+@Getter
+@Builder
+public class KeyValueGroup {
+ private final HashMap keyValue;
+}
diff --git a/src/main/java/io/numaproj/numaflow/sinker/Message.java b/src/main/java/io/numaproj/numaflow/sinker/Message.java
new file mode 100644
index 00000000..d8ede1cf
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/sinker/Message.java
@@ -0,0 +1,20 @@
+package io.numaproj.numaflow.sinker;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.util.HashMap;
+
+/**
+ * Message contains information that needs to be sent to the OnSuccess sink.
+ * The message can be different from the original message that was sent to primary sink.
+ */
+@Getter
+@Builder
+public class Message {
+ private final byte[] value;
+ private final String key;
+ private final HashMap userMetadata;
+}
+
+
diff --git a/src/main/java/io/numaproj/numaflow/sinker/Response.java b/src/main/java/io/numaproj/numaflow/sinker/Response.java
index 5d9537ce..b0c2b217 100644
--- a/src/main/java/io/numaproj/numaflow/sinker/Response.java
+++ b/src/main/java/io/numaproj/numaflow/sinker/Response.java
@@ -1,9 +1,16 @@
package io.numaproj.numaflow.sinker;
+import com.google.protobuf.ByteString;
+import common.MetadataOuterClass;
+import io.numaproj.numaflow.sink.v1.SinkOuterClass.SinkResponse;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+
/**
* Response is used to send response from the user defined sinker. It contains the id of the
* message, success status, an optional error message and a fallback status. Various static factory
@@ -16,15 +23,21 @@ public class Response {
private final Boolean success;
private final String err;
private final Boolean fallback;
+ private final Boolean serve;
+ private final byte[] serveResponse;
+ private final Boolean onSuccess;
+ // FIXME: Should this be Message object from this package? That would allow parity with other SDKs (specially Go)
+ // Currently done this way to prevent conversion in buildResult method.
+ private final SinkResponse.Result.Message onSuccessMessage;
- /**
+ /**
* Static method to create response for successful message processing.
*
* @param id id of the message
* @return Response object with success status
*/
public static Response responseOK(String id) {
- return new Response(id, true, null, false);
+ return new Response(id, true, null, false, false, null, false, null);
}
/**
@@ -35,7 +48,7 @@ public static Response responseOK(String id) {
* @return Response object with failure status and error message
*/
public static Response responseFailure(String id, String errMsg) {
- return new Response(id, false, errMsg, false);
+ return new Response(id, false, errMsg, false, false, null, false, null);
}
/**
@@ -46,6 +59,89 @@ public static Response responseFailure(String id, String errMsg) {
* @return Response object with fallback status
*/
public static Response responseFallback(String id) {
- return new Response(id, false, null, true);
+ return new Response(id, false, null, true, false, null, false, null);
+ }
+
+ /**
+ * Static method to create response for serve message which is raw bytes.
+ * This indicates that the message should be sent to the serving store.
+ * Allows creation of serve message from raw bytes.
+ *
+ * @param id id of the message
+ * @param serveResponse Response object to be sent to the serving store
+ * @return Response object with serve status and serve response
+ */
+ public static Response responseServe(String id, byte[] serveResponse) {
+ return new Response(id, false, null, false, true, serveResponse, false, null);
+ }
+
+ /**
+ * Static method to create response for onSuccess message. Allows creation of onSuccess message
+ * from protobuf Message object.
+ *
+ * @param id id of the message
+ * @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink
+ * @return Response object with onSuccess status and onSuccess message
+ */
+ public static Response responseOnSuccess(String id, SinkResponse.Result.Message onSuccessMessage) {
+ return new Response(id, false, null, false, false, null, true, onSuccessMessage);
+ }
+
+ /**
+ * Overloaded static method to create response for onSuccess message. Allows creation of onSuccess message
+ * from OnSuccessMessage object.
+ *
+ * @param id id of the message
+ * @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink. Can be null
+ * if original message needs to be written to onSuccess sink
+ * @return Response object with onSuccess status and onSuccess message
+ */
+ public static Response responseOnSuccess(String id, Message onSuccessMessage) {
+ if (onSuccessMessage == null) {
+ return new Response(id, false, null, false, false, null, true, null);
+ } else {
+
+ Map pbUserMetadata = MetadataOuterClass.Metadata
+ .getDefaultInstance()
+ .getUserMetadataMap();
+
+ if (onSuccessMessage.getUserMetadata() != null) {
+ pbUserMetadata =
+ onSuccessMessage.getUserMetadata()
+ .entrySet()
+ .stream()
+ .filter(e -> e.getKey() != null && e.getValue() != null)
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> MetadataOuterClass.KeyValueGroup.newBuilder()
+ .putAllKeyValue(e.getValue().getKeyValue() == null
+ ? Collections.emptyMap()
+ : e.getValue()
+ .getKeyValue()
+ .entrySet()
+ .stream()
+ .filter(kv -> kv.getKey() != null
+ && kv.getValue() != null)
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ kv -> ByteString.copyFrom(kv.getValue())
+ ))
+ )
+ .build()
+ ));
+ }
+
+ MetadataOuterClass.Metadata pbMetadata = MetadataOuterClass.Metadata.newBuilder()
+ .putAllUserMetadata(pbUserMetadata)
+ .build();
+
+ SinkResponse.Result.Message pbOnSuccessMessage = SinkResponse.Result.Message.newBuilder()
+ .addKeys(onSuccessMessage.getKey() == null ? "" : onSuccessMessage.getKey())
+ .setValue(onSuccessMessage.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(onSuccessMessage.getValue()))
+ .setMetadata(pbMetadata)
+ .build();
+
+ return new Response(id, false, null, false, false, null, true, pbOnSuccessMessage);
+ }
}
}
diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java
index e4cea580..5458a3f9 100644
--- a/src/main/java/io/numaproj/numaflow/sinker/Service.java
+++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.sinker;
import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
@@ -131,13 +132,43 @@ public void onCompleted() {
}
private SinkOuterClass.SinkResponse.Result buildResult(Response response) {
- SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK
- : response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
- return SinkOuterClass.SinkResponse.Result.newBuilder()
- .setId(response.getId() == null ? "" : response.getId())
- .setErrMsg(response.getErr() == null ? "" : response.getErr())
- .setStatus(status)
- .build();
+ if (response.getFallback()) {
+ return SinkOuterClass.SinkResponse.Result.newBuilder()
+ .setId(response.getId() == null ? "" : response.getId())
+ .setStatus(SinkOuterClass.Status.FALLBACK)
+ .build();
+ } else if (response.getSuccess()) {
+ return SinkOuterClass.SinkResponse.Result.newBuilder()
+ .setId(response.getId() == null ? "" : response.getId())
+ .setStatus(SinkOuterClass.Status.SUCCESS)
+ .build();
+ } else if (response.getServe()) {
+ // FIXME: Return error when serve response is not set?
+ return SinkOuterClass.SinkResponse.Result.newBuilder()
+ .setId(response.getId() == null ? "" : response.getId())
+ .setStatus(SinkOuterClass.Status.SERVE)
+ .setServeResponse(response.getServeResponse() == null ? null : ByteString.copyFrom(
+ response.getServeResponse()))
+ .build();
+ } else if (response.getOnSuccess()) {
+ // FIXME: Cannot set null for onSuccessMsg, so setting default instance
+ // Problematic because numaflow-core implementation regards null to be the original message
+ // TODO: Change numaflow-core implementation to also accept empty `value` field
+ return SinkOuterClass.SinkResponse.Result.newBuilder()
+ .setId(response.getId() == null ? "" : response.getId())
+ .setStatus(SinkOuterClass.Status.ON_SUCCESS)
+ .setOnSuccessMsg(response.getOnSuccessMessage() == null
+ ? SinkOuterClass.SinkResponse.Result.Message.getDefaultInstance()
+ : response.getOnSuccessMessage())
+ .build();
+ } else {
+ // FIXME: Return error when error message is not set?
+ return SinkOuterClass.SinkResponse.Result.newBuilder()
+ .setId(response.getId() == null ? "" : response.getId())
+ .setStatus(SinkOuterClass.Status.FAILURE)
+ .setErrMsg(response.getErr() == null ? "" : response.getErr())
+ .build();
+ }
}
/**
diff --git a/src/main/proto/common/metadata.proto b/src/main/proto/common/metadata.proto
new file mode 100644
index 00000000..bbf4b3ab
--- /dev/null
+++ b/src/main/proto/common/metadata.proto
@@ -0,0 +1,37 @@
+/*
+Copyright 2022 The Numaproj Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+syntax = "proto3";
+option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/common";
+
+package common;
+
+// Metadata is the metadata of the message
+message Metadata {
+ // PreviousVertex is the name of the previous vertex
+ string previous_vertex = 1;
+ // SystemMetadata is the system metadata of the message
+ // Key of the map is the group name
+ map sys_metadata = 2;
+ // UserMetadata is the user metadata of the message
+ // Key of the map is the group name
+ map user_metadata = 3;
+}
+
+// KeyValueGroup is a group of key-value pairs for a given group.
+message KeyValueGroup {
+ map key_value = 1;
+}
diff --git a/src/main/proto/sink/v1/sink.proto b/src/main/proto/sink/v1/sink.proto
index b9ea5ae7..b1484e4a 100644
--- a/src/main/proto/sink/v1/sink.proto
+++ b/src/main/proto/sink/v1/sink.proto
@@ -4,6 +4,7 @@ option java_package = "io.numaproj.numaflow.sink.v1";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
+import "common/metadata.proto";
package sink.v1;
@@ -65,6 +66,8 @@ enum Status {
SUCCESS = 0;
FAILURE = 1;
FALLBACK = 2;
+ SERVE = 3;
+ ON_SUCCESS = 4;
}
/**
@@ -72,12 +75,20 @@ enum Status {
*/
message SinkResponse {
message Result {
+ message Message {
+ bytes value = 1;
+ repeated string keys = 2;
+ common.Metadata metadata = 3;
+ }
// id is the ID of the message, can be used to uniquely identify the message.
string id = 1;
// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK.
Status status = 2;
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
+ optional bytes serve_response = 4;
+ // on_success_msg is the message to be sent to on_success sink.
+ optional Message on_success_msg = 5;
}
repeated Result results = 1;
optional Handshake handshake = 2;
diff --git a/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java b/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java
new file mode 100644
index 00000000..9b15bcce
--- /dev/null
+++ b/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java
@@ -0,0 +1,22 @@
+package io.numaproj.numaflow.mapper;
+
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class HandlerDatumTest {
+ @Test
+ public void testHandlerDatum() {
+ Instant watermark = Instant.now();
+ Instant eventTime = Instant.now();
+ HashMap headers = new HashMap<>();
+ headers.put("header1", "value1");
+ HandlerDatum datum = new HandlerDatum("asdf".getBytes(), watermark, eventTime, headers);
+ assertEquals(watermark, datum.getWatermark());
+ assertEquals(eventTime, datum.getEventTime());
+ assertEquals(headers, datum.getHeaders());
+ }
+}
diff --git a/src/test/java/io/numaproj/numaflow/mapper/MessageListTest.java b/src/test/java/io/numaproj/numaflow/mapper/MessageListTest.java
new file mode 100644
index 00000000..9e7521d6
--- /dev/null
+++ b/src/test/java/io/numaproj/numaflow/mapper/MessageListTest.java
@@ -0,0 +1,25 @@
+package io.numaproj.numaflow.mapper;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class MessageListTest {
+ @Test
+ public void testMessageList() {
+ Message defaultMessage = new Message("asdf".getBytes());
+ ArrayList messageList = new ArrayList<>();
+ messageList.add(defaultMessage);
+
+ MessageList messageList1 = new MessageList.MessageListBuilder()
+ .addMessages(messageList)
+ .addMessage(defaultMessage)
+ .build();
+
+ messageList.add(defaultMessage);
+
+ assertArrayEquals(messageList1.getMessages().toArray(), messageList.toArray());
+ }
+}
diff --git a/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java b/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java
new file mode 100644
index 00000000..343bf394
--- /dev/null
+++ b/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java
@@ -0,0 +1,22 @@
+package io.numaproj.numaflow.mapper;
+
+import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+
+public class MessageTest {
+ @Test
+ public void testMessage() {
+ Message message1 = new Message("asdf".getBytes());
+ assertArrayEquals("asdf".getBytes(), message1.getValue());
+ Message message2 = new Message("asdf".getBytes(), new String[]{"key1"});
+ assertArrayEquals("asdf".getBytes(), message2.getValue());
+ assertArrayEquals(new String[]{"key1"}, message2.getKeys());
+ Message message3 = new Message(null, null, null);
+ assertArrayEquals(null, message3.getValue());
+ Message message4 = Message.toDrop();
+ assertArrayEquals(new byte[0], message4.getValue());
+ assertArrayEquals(null, message4.getKeys());
+ String[] drop_tags = {"U+005C__DROP__"};
+ assertArrayEquals(drop_tags, message4.getTags());
+ }
+}
diff --git a/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java b/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java
new file mode 100644
index 00000000..2097b42f
--- /dev/null
+++ b/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java
@@ -0,0 +1,56 @@
+package io.numaproj.numaflow.sinker;
+
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class HandlerDatumTest {
+
+ @Test
+ public void testGetKeys() {
+ String[] keys = {"key1", "key2"};
+ HandlerDatum datum = new HandlerDatum(keys, null, null, null, null, null);
+ assertArrayEquals(keys, datum.getKeys());
+ }
+
+ @Test
+ public void testGetValue() {
+ byte[] value = {1, 2, 3};
+ HandlerDatum datum = new HandlerDatum(null, value, null, null, null, null);
+ assertArrayEquals(value, datum.getValue());
+ }
+
+ @Test
+ public void testGetWatermark() {
+ Instant watermark = Instant.now();
+ HandlerDatum datum = new HandlerDatum(null, null, watermark, null, null, null);
+ assertEquals(watermark, datum.getWatermark());
+ }
+
+ @Test
+ public void testGetEventTime() {
+ Instant eventTime = Instant.now();
+ HandlerDatum datum = new HandlerDatum(null, null, null, eventTime, null, null);
+ assertEquals(eventTime, datum.getEventTime());
+ }
+
+ @Test
+ public void testGetId() {
+ String id = "test-id";
+ HandlerDatum datum = new HandlerDatum(null, null, null, null, id, null);
+ assertEquals(id, datum.getId());
+ }
+
+ @Test
+ public void testGetHeaders() {
+ Map headers = new HashMap<>();
+ headers.put("header1", "value1");
+ HandlerDatum datum = new HandlerDatum(null, null, null, null, null, headers);
+ assertEquals(headers, datum.getHeaders());
+ }
+}
diff --git a/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java b/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java
new file mode 100644
index 00000000..841cfb85
--- /dev/null
+++ b/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java
@@ -0,0 +1,65 @@
+package io.numaproj.numaflow.sinker;
+
+import com.google.protobuf.ByteString;
+import common.MetadataOuterClass;
+import io.numaproj.numaflow.sink.v1.SinkOuterClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Map.entry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ResponseTest {
+ @Test
+ public void test_addResponse() {
+ String defaultId = "id";
+ Response response1 = Response.responseFallback(defaultId);
+ assertEquals(defaultId, response1.getId());
+ Response response2 = Response.responseOK(defaultId);
+ assertEquals(defaultId, response2.getId());
+ Response response3 = Response.responseServe(defaultId, "serve".getBytes());
+ assertEquals(defaultId, response3.getId());
+ Response response4 = Response.responseFailure(defaultId, "failure");
+ assertEquals(defaultId, response4.getId());
+
+ HashMap userMetadata = new HashMap<>();
+ userMetadata.put("group1", KeyValueGroup.builder().build());
+ HashMap kvg1 = new HashMap<>(Map.ofEntries(
+ entry("key1", "val1".getBytes())
+ ));
+ kvg1.put("key2", null);
+
+ userMetadata.put("group2", KeyValueGroup.builder().keyValue(kvg1).build());
+ userMetadata.put("group3", null);
+ Message onSuccessMessage1 = new Message("onSuccessValue".getBytes(), null, userMetadata);
+
+ Response response5 = Response.responseOnSuccess(defaultId, onSuccessMessage1);
+ assertEquals(defaultId, response5.getId());
+ assertEquals("", response5.getOnSuccessMessage().getKeys(0));
+
+ Message onSuccessMessage2 = new Message("onSuccessValue".getBytes(), null, null);
+ Response response6 = Response.responseOnSuccess(defaultId, onSuccessMessage2);
+ assertEquals(defaultId, response6.getId());
+ assertEquals(MetadataOuterClass.Metadata.newBuilder()
+ .putAllUserMetadata(MetadataOuterClass.Metadata
+ .getDefaultInstance()
+ .getUserMetadataMap()).build(),
+ response6.getOnSuccessMessage().getMetadata());
+
+ Message onSuccessMessage3 = new Message(null, "key", null);
+ Response response7 = Response.responseOnSuccess(defaultId, onSuccessMessage3);
+ assertEquals(defaultId, response7.getId());
+ assertEquals(ByteString.copyFrom("".getBytes()), response7.getOnSuccessMessage().getValue());
+ assertEquals("key", response7.getOnSuccessMessage().getKeys(0));
+
+ Response response8 = Response.responseOnSuccess(defaultId, (Message) null);
+ assertEquals(defaultId, response8.getId());
+ assertNull(response8.getOnSuccessMessage());
+
+ Response response9 = Response.responseOnSuccess(defaultId, ( SinkOuterClass.SinkResponse.Result.Message) null);
+ assertNull(response9.getOnSuccessMessage());
+ }
+}
diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java
index 5666f5f9..32de9fce 100644
--- a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.sinker;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.google.protobuf.ByteString;
@@ -53,6 +54,20 @@ public void tearDown() throws Exception {
server.stop();
}
+ @Test
+ public void testServer() {
+ // Test for coverage for different constructors for Server
+ Sinker sinker = new TestSinkFn();
+ Server server = new Server(sinker);
+ try {
+ server.start();
+ server.stop();
+ } catch (Exception e) {
+ // Interrupted exceptions are let go
+ assertFalse(e instanceof RuntimeException);
+ }
+ }
+
@Test
public void sinkerSuccess() {
int batchSize = 6;
@@ -75,10 +90,16 @@ public void sinkerSuccess() {
for (int i = 1; i <= batchSize * numBatches; i++) {
String[] keys;
- if (i < batchSize * numBatches) {
+ if (i % 2 == 0) {
keys = new String[] {"valid-key"};
+ } else if (i % 3 == 0) {
+ keys = new String[] {"fallback-key"};
+ } else if (i % 5 == 0) {
+ keys = new String[] {"onsuccess-key"};
+ } else if (i % 7 == 0) {
+ keys = new String[] {"serve-key"};
} else {
- keys = new String[] {"invalid-key"};
+ keys = new String[] {"invalid-key"};
}
SinkOuterClass.SinkRequest.Request request =
@@ -122,10 +143,19 @@ public void sinkerSuccess() {
result -> {
if (result.getStatus() == SinkOuterClass.Status.FAILURE) {
assertEquals("error message", result.getErrMsg());
- return;
+ } else if (result.getStatus() == SinkOuterClass.Status.FALLBACK) {
+ assertEquals(result.getId(), expectedId);
+ assertEquals(SinkOuterClass.Status.FALLBACK, result.getStatus());
+ } else if (result.getStatus() == SinkOuterClass.Status.ON_SUCCESS) {
+ assertEquals(result.getId(), expectedId);
+ assertEquals(SinkOuterClass.Status.ON_SUCCESS, result.getStatus());
+ } else if (result.getStatus() == SinkOuterClass.Status.SERVE) {
+ assertEquals(result.getId(), expectedId);
+ assertEquals(SinkOuterClass.Status.SERVE, result.getStatus());
+ } else {
+ assertEquals(result.getId(), expectedId);
+ assertEquals(SinkOuterClass.Status.SUCCESS, result.getStatus());
}
- assertEquals(result.getId(), expectedId);
- assertEquals(SinkOuterClass.Status.SUCCESS, result.getStatus());
});
}
}
@@ -147,12 +177,22 @@ public ResponseList processMessages(DatumIterator datumIterator) {
if (datum == null) {
break;
}
+
if (Arrays.equals(datum.getKeys(), new String[] {"invalid-key"})) {
- builder.addResponse(
- Response.responseFailure(datum.getId() + processedIdSuffix, "error message"));
- continue;
+ builder.addResponse(
+ Response.responseFailure(datum.getId() + processedIdSuffix, "error message"));
+ } else if (Arrays.equals(datum.getKeys(), new String[] {"fallback-key"})) {
+ builder.addResponse(
+ Response.responseFallback(datum.getId() + processedIdSuffix));
+ } else if (Arrays.equals(datum.getKeys(), new String[] {"onsuccess-key"})) {
+ builder.addResponse(
+ Response.responseOnSuccess(datum.getId() + processedIdSuffix, (Message) null));
+ } else if (Arrays.equals(datum.getKeys(), new String[] {"serve-key"})) {
+ builder.addResponse(
+ Response.responseServe(datum.getId() + processedIdSuffix, "serve message".getBytes()));
+ } else {
+ builder.addResponse(Response.responseOK(datum.getId() + processedIdSuffix));
}
- builder.addResponse(Response.responseOK(datum.getId() + processedIdSuffix));
}
return builder.build();