diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java index b24342d4..0d9b8026 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java @@ -44,11 +44,16 @@ public ResponseList processMessages(DatumIterator datumIterator) { log.info("Received message: {}, id: {}, headers - {}", msg, datum.getId(), datum.getHeaders()); if (writeToPrimarySink()) { log.info("Writing to onSuccess sink: {}", datum.getId()); + // Build the onSuccess message using builder for changing values, keys or userMetadata responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(), Message.builder() .value(String.format("Successfully wrote message with ID: %s", datum.getId()).getBytes()) + .userMetadata(datum.getUserMetadata()) + .keys(datum.getKeys()) .build())); + // Or send the same values as datum using: + // responseListBuilder.addResponse(Response.responseOnSuccess(datum)); } else { log.info("Writing to fallback sink: {}", datum.getId()); responseListBuilder.addResponse(Response.responseFallback(datum.getId())); diff --git a/src/main/java/io/numaproj/numaflow/mapper/Datum.java b/src/main/java/io/numaproj/numaflow/mapper/Datum.java index 0ef1a55f..f02ff937 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Datum.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Datum.java @@ -1,6 +1,9 @@ package io.numaproj.numaflow.mapper; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; + import java.time.Instant; import java.util.Map; @@ -36,4 +39,19 @@ public interface Datum { * @return returns the headers in the form of key value pair */ Map getHeaders(); + + /** + * method to get the metadata information added by the user. + * It can be appended to and passed downstream. + * + * @return returns the UserMetadata object + */ + UserMetadata getUserMetadata(); + + /** + * method to get the read-only system metadata information + * + * @return returns the SystemMetadata object + */ + SystemMetadata getSystemMetadata(); } diff --git a/src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java index 17945596..d6228b86 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java @@ -1,6 +1,8 @@ package io.numaproj.numaflow.mapper; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import lombok.AllArgsConstructor; import java.time.Instant; @@ -13,7 +15,8 @@ class HandlerDatum implements Datum { private Instant watermark; private Instant eventTime; private Map headers; - + private UserMetadata userMetadata; + private SystemMetadata systemMetadata; @Override public Instant getWatermark() { @@ -35,4 +38,13 @@ public Map getHeaders() { return this.headers; } + @Override + public UserMetadata getUserMetadata() { + return this.userMetadata; + } + + @Override + public SystemMetadata getSystemMetadata() { + return this.systemMetadata; + } } diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java index a6a56078..a517e816 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java @@ -4,12 +4,15 @@ import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; import com.google.protobuf.ByteString; +import common.MetadataOuterClass; import io.numaproj.numaflow.map.v1.MapOuterClass; -import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; /** * Mapper actor that processes the map request. It invokes the mapper to process the request and @@ -48,7 +51,9 @@ private void processRequest(MapOuterClass.MapRequest mapRequest) { Instant.ofEpochSecond( mapRequest.getRequest().getEventTime().getSeconds(), mapRequest.getRequest().getEventTime().getNanos()), - mapRequest.getRequest().getHeadersMap() + mapRequest.getRequest().getHeadersMap(), + new UserMetadata(mapRequest.getRequest().getMetadata()), + new SystemMetadata(mapRequest.getRequest().getMetadata()) ); String[] keys = mapRequest.getRequest().getKeysList().toArray(new String[0]); try { @@ -89,6 +94,8 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags(message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags())) + .setMetadata(message.getUserMetadata() + == null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto()) .build()); }); return responseBuilder.setId(ID).build(); diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java b/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java index 9567568f..71d2326f 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java @@ -7,6 +7,8 @@ import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapGrpc; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import lombok.Builder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -233,5 +235,7 @@ public static class TestDatum implements Datum { private final Instant eventTime; private final Instant watermark; private final Map headers; + private final UserMetadata userMetadata; + private final SystemMetadata systemMetadata; } } diff --git a/src/main/java/io/numaproj/numaflow/mapper/Message.java b/src/main/java/io/numaproj/numaflow/mapper/Message.java index 57f939f0..518a6f62 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Message.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Message.java @@ -1,7 +1,12 @@ package io.numaproj.numaflow.mapper; +import io.numaproj.numaflow.shared.UserMetadata; import lombok.Getter; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + /** Message is used to wrap the data returned by Mapper. */ @Getter public class Message { @@ -9,19 +14,23 @@ public class Message { private final String[] keys; private final byte[] value; private final String[] tags; + private final UserMetadata userMetadata; /** - * used to create Message with value, keys and tags(used for conditional forwarding) + * used to create Message with value, keys, tags(used for conditional forwarding) and userMetadata * * @param value message value * @param keys message keys * @param tags message tags which will be used for conditional forwarding + * @param userMetadata user metadata, this is used to pass user defined metadata to the next vertex */ - public Message(byte[] value, String[] keys, String[] tags) { + public Message(byte[] value, String[] keys, String[] tags, UserMetadata userMetadata) { // defensive copy - once the Message is created, the caller should not be able to modify it. this.keys = keys == null ? null : keys.clone(); this.value = value == null ? null : value.clone(); this.tags = tags == null ? null : tags.clone(); + // Copy the data using copy constructor to prevent mutation + this.userMetadata = userMetadata == null ? null : new UserMetadata(userMetadata); } /** @@ -30,7 +39,7 @@ public Message(byte[] value, String[] keys, String[] tags) { * @param value message value */ public Message(byte[] value) { - this(value, null, null); + this(value, null, null, null); } /** @@ -40,7 +49,18 @@ public Message(byte[] value) { * @param keys message keys */ public Message(byte[] value, String[] keys) { - this(value, keys, null); + this(value, keys, null, null); + } + + /** + * used to create Message with value, keys and tags(used for conditional forwarding) + * + * @param value message value + * @param keys message keys + * @param tags message tags which will be used for conditional forwarding + */ + public Message(byte[] value, String[] keys, String[] tags) { + this(value, keys, tags, null); } /** @@ -49,6 +69,6 @@ public Message(byte[] value, String[] keys) { * @return returns the Message which will be dropped */ public static Message toDrop() { - return new Message(new byte[0], null, DROP_TAGS); + return new Message(new byte[0], null, DROP_TAGS, null); } } diff --git a/src/main/java/io/numaproj/numaflow/shared/SystemMetadata.java b/src/main/java/io/numaproj/numaflow/shared/SystemMetadata.java new file mode 100644 index 00000000..f82749d6 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/shared/SystemMetadata.java @@ -0,0 +1,91 @@ +package io.numaproj.numaflow.shared; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import common.MetadataOuterClass; +import lombok.Getter; + +/** + * SystemMetadata is mapping of group name to key-value pairs + * SystemMetadata wraps system-generated metadata groups per message. + * It is read-only to UDFs + */ +public class SystemMetadata { + private final Map> data; + + /** + * Default constructor + */ + public SystemMetadata() { + this.data = new HashMap<>(); + } + + /** + * Constructor from MetadataOuterClass.Metadata + * + * @param metadata is an instance of MetadataOuterClass.Metadata which contains system metadata + */ + public SystemMetadata(MetadataOuterClass.Metadata metadata) { + if (metadata == null || metadata.getSysMetadataMap().isEmpty()) { + this.data = new HashMap<>(); + return; + } + this.data = metadata.getSysMetadataMap() + .entrySet().stream() + // No null checks here as protobuf contract ensures that the data has no null values + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> new HashMap<>(e.getValue() + .getKeyValueMap() + .entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e1 -> e1.getValue().toByteArray() + )) + ) + )); + } + + /** + * Get the list of all groups present in the user metadata + * + * @return list of group names + */ + public List getGroups() { + return new ArrayList<>(this.data.keySet()); + } + + /** + * Get a list of key names within a given group + * + * @param group is the name of the group from which to get the key names + * @return a list of key names within the group + */ + public List getKeys(String group) { + if (!this.data.containsKey(group)) { + return new ArrayList<>(); + } + return new ArrayList<>(this.data.get(group).keySet()); + } + + /** + * Get the value of a key in a group + * + * @param group Name of the group which contains the key holding required value + * @param key Name of the key in the group for which value is required + * @return Value of the key in the group or null if the group/key is not present + */ + public byte[] getValue(String group, String key) { + Map groupData = this.data.get(group); + if (groupData == null) { + return null; + } + byte[] value = groupData.get(key); + return value == null ? null : value.clone(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/shared/UserMetadata.java b/src/main/java/io/numaproj/numaflow/shared/UserMetadata.java new file mode 100644 index 00000000..3a397dfb --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/shared/UserMetadata.java @@ -0,0 +1,217 @@ +package io.numaproj.numaflow.shared; + +import common.MetadataOuterClass; +import com.google.protobuf.ByteString; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * UserMetadata is mapping of group name to key-value pairs + * UserMetadata wraps user generated metadata groups per message. + * It can be appended to and passed on to the downstream. + * Note: All the null checks have been added within the constructors to ensure + * that the data and its entries are always valid. + */ +public class UserMetadata { + private final Map> data; + + /** + * Default constructor for UserMetadata. + * Initializes an empty HashMap data. + */ + public UserMetadata() { + this.data = new HashMap<>(); + } + + /** + * Constructor from MetadataOuterClass.Metadata. + * Initializes an empty HashMap data if the metadata passed is null or empty. + * For each entry in {@code metadata.getUserMetadataMap()}, it creates a new HashMap and copies the key-value pairs + * from the entry to the new HashMap. It also filters out any null values. + * + * @param metadata is an instance of MetadataOuterClass.Metadata which contains user metadata + */ + public UserMetadata(MetadataOuterClass.Metadata metadata) { + if (metadata == null || metadata.getUserMetadataMap().isEmpty()) { + this.data = new HashMap<>(); + return; + } + // Copy the data to prevent mutation + this.data = metadata.getUserMetadataMap() + .entrySet() + .stream() + // No null checks here as protobuf contract ensures that the data has no null values + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue() + .getKeyValueMap().entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e1 -> e1.getValue().toByteArray() + )) + ) + ); + } + + /** + * Copy constructor for UserMetadata. + * Returns a UserMetadata with empty HashMap data if the userMetadata passed is null or + * {@code userMetadata.data} is null. + * For each entry in {@code userMetadata.data}, it creates a new HashMap and copies the key-value pairs + * from the entry to the new HashMap. It also filters out any null values. + * + * @param userMetadata the user metadata to copy + */ + public UserMetadata(UserMetadata userMetadata) { + if (userMetadata == null || userMetadata.data == null) { + this.data = new HashMap<>(); + return; + } + + // Deep copy the data to prevent mutation + this.data = userMetadata.data.entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().entrySet() + .stream() + .filter(e1 -> e1.getValue() != null) + .collect(Collectors.toMap( + Map.Entry::getKey, + e1 -> e1.getValue().clone() + )) + )); + } + + /** + * Convert the user metadata to a map that can be used to create MetadataOuterClass.Metadata + * For each entry in {@code data}, it creates a new MetadataOuterClass.KeyValueGroup and copies the key-value pairs + * from the entry to the new MetadataOuterClass.KeyValueGroup. + * It also filters out any null values. + * + * @return MetadataOuterClass.Metadata + */ + public MetadataOuterClass.Metadata toProto() { + Map result = new HashMap<>(); + this.data.forEach((group, kvMap) -> { + MetadataOuterClass.KeyValueGroup.Builder builder = MetadataOuterClass.KeyValueGroup.newBuilder(); + // No null checks required as the constructor and add methods ensures that the data is valid + kvMap.forEach((key, value) -> builder.putKeyValue(key, ByteString.copyFrom(value))); + + result.put(group, builder.build()); + }); + + return MetadataOuterClass.Metadata + .newBuilder() + .putAllUserMetadata(result) + .build(); + } + + /** + * Get the list of all groups present in the user metadata + * + * @return list of group names + */ + public List getGroups() { + return new ArrayList<>(this.data.keySet()); + } + + /** + * Delete a group from the user metadata + * + * @param group is the name of the group to delete + */ + public void deleteGroup(String group) { + this.data.remove(group); + } + + /** + * Get a list of key names within a given group + * + * @param group is the name of the group from which to get the key names + * @return a list of key names within the group + */ + public List getKeys(String group) { + if (!this.data.containsKey(group)) { + return new ArrayList<>(); + } + return new ArrayList<>(this.data.get(group).keySet()); + } + + /** + * Delete a key from a group + * + * @param group Name of the group containing the key + * @param key Name of the key to delete + */ + public void deleteKey(String group, String key) { + if (!this.data.containsKey(group)) { + return; + } + this.data.get(group).remove(key); + } + + /** + * Add a key value pair to a group + * Note: If the value is null, the key/value pair will not be added to the group + * + * @param group Name of the group to which key value pairs are to be added + * @param key Name of the key in group to which the value is to be added + * @param value Value to be added to the key + */ + public void addKV(String group, String key, byte[] value) { + // null values are not added + if (group != null && key != null && value != null) { + this.data + .computeIfAbsent(group, k -> new HashMap<>()) + .put(key, value.clone()); + } + } + + /** + * Add multiple key value pairs to a group + * Note: If the value is null, the key/value pair will not be added to the group + * + * @param group Name of the group to which key value pairs are to be added + * @param kv Map of key value pairs to be added to the group + */ + public void addKVs(String group, Map kv) { + if (group == null || kv == null) { + return; + } + Map groupMap = this.data.computeIfAbsent(group, k -> new HashMap<>()); + kv.forEach((key, value) -> { + if (key != null && value != null) { + groupMap.put(key, value.clone()); + } + }); + } + + /** + * Get the value of a key in a group + * + * @param group Name of the group which contains the key holding required value + * @param key Name of the key in the group for which value is required + * @return Value of the key in the group or null if the group/key is not present + */ + public byte[] getValue(String group, String key) { + Map groupData = this.data.get(group); + if (groupData == null) { + return null; + } + byte[] value = groupData.get(key); + // null value should not be present but check added for safety + return value == null ? null : value.clone(); + } + + /** + * Clear all the user metadata + */ + public void clear() { + this.data.clear(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/sinker/Datum.java b/src/main/java/io/numaproj/numaflow/sinker/Datum.java index 68cc91a4..7b5b1200 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Datum.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Datum.java @@ -1,5 +1,8 @@ package io.numaproj.numaflow.sinker; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; + import java.time.Instant; import java.util.Map; @@ -48,4 +51,18 @@ public interface Datum { * @return returns the headers in the form of key value pair */ Map getHeaders(); + + /** + * method to get user defined metadata information of the payload + * + * @return user metadata + */ + UserMetadata getUserMetadata(); + + /** + * method to get system metadata information of the payload + * + * @return system metadata + */ + SystemMetadata getSystemMetadata(); } diff --git a/src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java index 60e9a2b2..c5fbcbaf 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java @@ -1,5 +1,7 @@ package io.numaproj.numaflow.sinker; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import lombok.AllArgsConstructor; import java.time.Instant; @@ -9,13 +11,15 @@ class HandlerDatum implements Datum { // EOF_DATUM is used to indicate the end of the stream. - static final HandlerDatum EOF_DATUM = new HandlerDatum(null, null, null, null, null, null); + static final HandlerDatum EOF_DATUM = new HandlerDatum(null, null, null, null, null, null, null, null); private String[] keys; private byte[] value; private Instant watermark; private Instant eventTime; private String id; private Map headers; + private UserMetadata userMetadata; + private SystemMetadata systemMetadata; @Override public String[] getKeys() { @@ -46,4 +50,14 @@ public String getId() { public Map getHeaders() { return this.headers; } + + @Override + public UserMetadata getUserMetadata() { + return this.userMetadata; + } + + @Override + public SystemMetadata getSystemMetadata() { + return this.systemMetadata; + } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java b/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java deleted file mode 100644 index abc0db08..00000000 --- a/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.numaproj.numaflow.sinker; - -import lombok.Builder; -import lombok.Getter; - -import java.util.HashMap; - -/** - * KeyValueGroup is a map of key-value pairs for a given group. - * Used as part of {@link io.numaproj.numaflow.sinker.Message}. - */ -@Getter -@Builder -public class KeyValueGroup { - private final HashMap keyValue; -} diff --git a/src/main/java/io/numaproj/numaflow/sinker/Message.java b/src/main/java/io/numaproj/numaflow/sinker/Message.java index d8ede1cf..f803b16c 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Message.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Message.java @@ -1,9 +1,14 @@ package io.numaproj.numaflow.sinker; +import com.google.protobuf.ByteString; +import common.MetadataOuterClass; +import io.numaproj.numaflow.shared.UserMetadata; +import io.numaproj.numaflow.sink.v1.SinkOuterClass; import lombok.Builder; import lombok.Getter; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Arrays; /** * Message contains information that needs to be sent to the OnSuccess sink. @@ -13,8 +18,52 @@ @Builder public class Message { private final byte[] value; - private final String key; - private final HashMap userMetadata; + private final String[] keys; + /** + * userMetadata is the user defined metadata that is added to the onSuccess message + * This is using the common {@link UserMetadata} class to allow reusing the user metadata stored in Datum + */ + private final UserMetadata userMetadata; + + /** + * Static method to create an onSuccess message from a sinker Datum object. + * + * @param datum object used to create the onSuccess message. + * The created onSuccess message will have the same value, keys and userMetadata as the original datum + * @return onSuccess message + */ + public static Message fromDatum(Datum datum) { + if (datum == null) { + return Message.builder().build(); + } + return Message.builder() + .value(datum.getValue().clone()) + .keys(datum.getKeys().clone()) + .userMetadata(new UserMetadata(datum.getUserMetadata())) + .build(); + } + + /** + * Static method to convert a Message object to a SinkOuterClass.SinkResponse.Result.Message object. + * If the message is null, returns the default instance of SinkOuterClass.SinkResponse.Result.Message. + * + * @param message The message object to convert into the relevant proto object + * @return The converted proto object + */ + public static SinkOuterClass.SinkResponse.Result.Message toProto(Message message) { + if (message == null) { + return SinkOuterClass.SinkResponse.Result.Message.getDefaultInstance(); + } + return SinkOuterClass.SinkResponse.Result.Message.newBuilder() + .addAllKeys(message.getKeys() + == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) + .setValue(message.getValue() + == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue())) + .setMetadata(message.getUserMetadata() + == null ? MetadataOuterClass.Metadata.getDefaultInstance() + : message.getUserMetadata().toProto()) + .build(); + } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Response.java b/src/main/java/io/numaproj/numaflow/sinker/Response.java index b0c2b217..96123005 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Response.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Response.java @@ -1,16 +1,9 @@ package io.numaproj.numaflow.sinker; -import com.google.protobuf.ByteString; -import common.MetadataOuterClass; -import io.numaproj.numaflow.sink.v1.SinkOuterClass.SinkResponse; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; -import java.util.Collections; -import java.util.Map; -import java.util.stream.Collectors; - /** * Response is used to send response from the user defined sinker. It contains the id of the * message, success status, an optional error message and a fallback status. Various static factory @@ -26,9 +19,7 @@ public class Response { private final Boolean serve; private final byte[] serveResponse; private final Boolean onSuccess; - // FIXME: Should this be Message object from this package? That would allow parity with other SDKs (specially Go) - // Currently done this way to prevent conversion in buildResult method. - private final SinkResponse.Result.Message onSuccessMessage; + private final Message onSuccessMessage; /** * Static method to create response for successful message processing. @@ -76,15 +67,17 @@ public static Response responseServe(String id, byte[] serveResponse) { } /** - * Static method to create response for onSuccess message. Allows creation of onSuccess message - * from protobuf Message object. + * Static method to create response for onSuccess message using the Datum object. + * NOTE: response id is null if datum is null * - * @param id id of the message - * @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink + * @param datum Datum object using which onSuccess message is created. Can be the original datum * @return Response object with onSuccess status and onSuccess message */ - public static Response responseOnSuccess(String id, SinkResponse.Result.Message onSuccessMessage) { - return new Response(id, false, null, false, false, null, true, onSuccessMessage); + public static Response responseOnSuccess(Datum datum) { + if (datum == null) { + return new Response(null, false, null, false, false, null, true, null); + } + return responseOnSuccess(datum.getId(), Message.fromDatum(datum)); } /** @@ -97,51 +90,6 @@ public static Response responseOnSuccess(String id, SinkResponse.Result.Message * @return Response object with onSuccess status and onSuccess message */ public static Response responseOnSuccess(String id, Message onSuccessMessage) { - if (onSuccessMessage == null) { - return new Response(id, false, null, false, false, null, true, null); - } else { - - Map pbUserMetadata = MetadataOuterClass.Metadata - .getDefaultInstance() - .getUserMetadataMap(); - - if (onSuccessMessage.getUserMetadata() != null) { - pbUserMetadata = - onSuccessMessage.getUserMetadata() - .entrySet() - .stream() - .filter(e -> e.getKey() != null && e.getValue() != null) - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> MetadataOuterClass.KeyValueGroup.newBuilder() - .putAllKeyValue(e.getValue().getKeyValue() == null - ? Collections.emptyMap() - : e.getValue() - .getKeyValue() - .entrySet() - .stream() - .filter(kv -> kv.getKey() != null - && kv.getValue() != null) - .collect(Collectors.toMap( - Map.Entry::getKey, - kv -> ByteString.copyFrom(kv.getValue()) - )) - ) - .build() - )); - } - - MetadataOuterClass.Metadata pbMetadata = MetadataOuterClass.Metadata.newBuilder() - .putAllUserMetadata(pbUserMetadata) - .build(); - - SinkResponse.Result.Message pbOnSuccessMessage = SinkResponse.Result.Message.newBuilder() - .addKeys(onSuccessMessage.getKey() == null ? "" : onSuccessMessage.getKey()) - .setValue(onSuccessMessage.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(onSuccessMessage.getValue())) - .setMetadata(pbMetadata) - .build(); - - return new Response(id, false, null, false, false, null, true, pbOnSuccessMessage); - } + return new Response(id, false, null, false, false, null, true, onSuccessMessage); } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 5458a3f9..3c51df58 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -9,6 +9,8 @@ import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.shared.ExceptionUtils; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import io.numaproj.numaflow.sink.v1.SinkGrpc; import io.numaproj.numaflow.sink.v1.SinkOuterClass; import java.time.Instant; @@ -157,9 +159,7 @@ private SinkOuterClass.SinkResponse.Result buildResult(Response response) { return SinkOuterClass.SinkResponse.Result.newBuilder() .setId(response.getId() == null ? "" : response.getId()) .setStatus(SinkOuterClass.Status.ON_SUCCESS) - .setOnSuccessMsg(response.getOnSuccessMessage() == null - ? SinkOuterClass.SinkResponse.Result.Message.getDefaultInstance() - : response.getOnSuccessMessage()) + .setOnSuccessMsg(Message.toProto(response.getOnSuccessMessage())) .build(); } else { // FIXME: Return error when error message is not set? @@ -193,7 +193,10 @@ private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) { d.getRequest().getEventTime().getSeconds(), d.getRequest().getEventTime().getNanos()), d.getRequest().getId(), - d.getRequest().getHeadersMap()); + d.getRequest().getHeadersMap(), + new UserMetadata(d.getRequest().getMetadata()), + new SystemMetadata(d.getRequest().getMetadata()) + ); } // shuts down the executor service diff --git a/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java index efa8a9d4..7d842d01 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java +++ b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java @@ -5,6 +5,8 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import io.numaproj.numaflow.sink.v1.SinkGrpc; import io.numaproj.numaflow.sink.v1.SinkOuterClass; import jdk.jfr.Experimental; @@ -302,5 +304,8 @@ public static class TestDatum implements Datum { private final Instant eventTime; private final Instant watermark; private final Map headers; + private final UserMetadata userMetadata; + private final SystemMetadata systemMetadata; + } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Message.java b/src/main/java/io/numaproj/numaflow/sourcer/Message.java index cc8502bf..8507209c 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Message.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Message.java @@ -2,6 +2,8 @@ import java.time.Instant; import java.util.Map; + +import io.numaproj.numaflow.shared.UserMetadata; import lombok.Getter; /** Message is used to wrap the data returned by Sourcer. */ @@ -13,6 +15,7 @@ public class Message { private final Offset offset; private final Instant eventTime; private final Map headers; + private final UserMetadata userMetadata; /** * used to create Message with value, offset and eventTime. @@ -22,7 +25,7 @@ public class Message { * @param eventTime message eventTime */ public Message(byte[] value, Offset offset, Instant eventTime) { - this(value, offset, eventTime, null, null); + this(value, offset, eventTime, null, null, null); } /** @@ -34,7 +37,7 @@ public Message(byte[] value, Offset offset, Instant eventTime) { * @param keys message keys */ public Message(byte[] value, Offset offset, Instant eventTime, String[] keys) { - this(value, offset, eventTime, keys, null); + this(value, offset, eventTime, keys, null, null); } /** @@ -46,7 +49,45 @@ public Message(byte[] value, Offset offset, Instant eventTime, String[] keys) { * @param headers message headers */ public Message(byte[] value, Offset offset, Instant eventTime, Map headers) { - this(value, offset, eventTime, null, headers); + this(value, offset, eventTime, null, headers, null); + } + + /** + * used to create Message with value, offset, eventTime and userMetadata. + * + * @param value message value + * @param offset message offset + * @param eventTime message eventTime + * @param userMetadata message userMetadata + */ + public Message(byte[] value, Offset offset, Instant eventTime, UserMetadata userMetadata) { + this(value, offset, eventTime, null, null, userMetadata); + } + + /** + * used to create Message with value, offset, eventTime, keys and userMetadata. + * + * @param value message value + * @param offset message offset + * @param eventTime message eventTime + * @param keys message keys + * @param userMetadata message userMetadata + */ + public Message(byte[] value, Offset offset, Instant eventTime, String[] keys, UserMetadata userMetadata) { + this(value, offset, eventTime, keys, null, userMetadata); + } + + /** + * used to create Message with value, offset, eventTime, headers and userMetadata. + * + * @param value message value + * @param offset message offset + * @param eventTime message eventTime + * @param headers message headers + * @param userMetadata message userMetadata + */ + public Message(byte[] value, Offset offset, Instant eventTime, Map headers, UserMetadata userMetadata) { + this(value, offset, eventTime, null, headers, userMetadata); } /** @@ -58,8 +99,22 @@ public Message(byte[] value, Offset offset, Instant eventTime, Map headers) { + this(value, offset, eventTime, keys, headers, null); + } + + /** + * used to create Message with value, offset, eventTime, keys, headers and userMetadata. + * + * @param value message value + * @param offset message offset + * @param eventTime message eventTime + * @param keys message keys + * @param headers message headers + * @param userMetadata message userMetadata + */ public Message( - byte[] value, Offset offset, Instant eventTime, String[] keys, Map headers) { + byte[] value, Offset offset, Instant eventTime, String[] keys, Map headers, UserMetadata userMetadata) { // defensive copy - once the Message is created, the caller should not be able to modify it. this.keys = keys == null ? null : keys.clone(); this.value = value == null ? null : value.clone(); @@ -67,5 +122,7 @@ public Message( this.offset = offset == null ? null : new Offset(offset.getValue(), offset.getPartitionId()); // The Instant class in Java is already immutable. this.eventTime = eventTime; + // Copy the data using copy constructor to prevent mutation + this.userMetadata = userMetadata == null ? null : new UserMetadata(userMetadata); } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java b/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java index 14a23543..3bc9a7c1 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java @@ -2,6 +2,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; +import common.MetadataOuterClass; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.source.v1.SourceOuterClass; import lombok.AllArgsConstructor; @@ -46,6 +47,7 @@ private SourceOuterClass.ReadResponse buildResponse(Message message) { .setPartitionId(message.getOffset().getPartitionId())) .putAllHeaders(message.getHeaders() != null ? message.getHeaders() : new HashMap<>()) + .setMetadata(message.getUserMetadata() == null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto()) .build()); return builder.build(); diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java index 0f2b3953..eb45cdc2 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java @@ -1,6 +1,9 @@ package io.numaproj.numaflow.sourcetransformer; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; + import java.time.Instant; import java.util.Map; @@ -36,4 +39,18 @@ public interface Datum { * @return returns the headers in the form of key value pair */ Map getHeaders(); + + /** + * method to get user defined metadata information of the payload + * + * @return user metadata + */ + UserMetadata getUserMetadata(); + + /** + * method to get system metadata information of the payload + * + * @return system metadata + */ + SystemMetadata getSystemMetadata(); } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/HandlerDatum.java index e3774ed8..d4bbbd2d 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/HandlerDatum.java @@ -1,6 +1,8 @@ package io.numaproj.numaflow.sourcetransformer; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import lombok.AllArgsConstructor; import java.time.Instant; @@ -13,6 +15,8 @@ class HandlerDatum implements Datum { private Instant watermark; private Instant eventTime; private Map headers; + private UserMetadata userMetadata; + private SystemMetadata systemMetadata; @Override @@ -34,4 +38,14 @@ public Instant getEventTime() { public Map getHeaders() { return this.headers; } + + @Override + public UserMetadata getUserMetadata() { + return this.userMetadata; + } + + @Override + public SystemMetadata getSystemMetadata() { + return this.systemMetadata; + } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java index 6aefd0e7..f1c3244d 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java @@ -1,6 +1,8 @@ package io.numaproj.numaflow.sourcetransformer; import java.time.Instant; + +import io.numaproj.numaflow.shared.UserMetadata; import lombok.Getter; /** Message is used to wrap the data return by SourceTransformer functions. */ @@ -11,22 +13,37 @@ public class Message { private final byte[] value; private final Instant eventTime; private final String[] tags; + private final UserMetadata userMetadata; /** - * used to create Message with value, eventTime, keys and tags(used for conditional forwarding) + * used to create Message with value, eventTime, keys, tags(used for conditional forwarding) and userMetadata * * @param value message value * @param eventTime message eventTime * @param keys message keys * @param tags message tags which will be used for conditional forwarding + * @param userMetadata user metadata */ - public Message(byte[] value, Instant eventTime, String[] keys, String[] tags) { + public Message(byte[] value, Instant eventTime, String[] keys, String[] tags, UserMetadata userMetadata) { // defensive copy - once the Message is created, the caller should not be able to modify it. this.keys = keys == null ? null : keys.clone(); this.value = value == null ? null : value.clone(); this.tags = tags == null ? null : tags.clone(); // The Instant class in Java is already immutable. this.eventTime = eventTime; + this.userMetadata = userMetadata == null ? null : new UserMetadata(userMetadata); + } + + /** + * used to create Message with value, eventTime, keys, tags(used for conditional forwarding) + * + * @param value message value + * @param eventTime message eventTime + * @param keys message keys + * @param tags message tags which will be used for conditional forwarding + */ + public Message(byte[] value, Instant eventTime, String[] keys, String[] tags) { + this(value, eventTime, keys, tags, null); } /** @@ -36,7 +53,7 @@ public Message(byte[] value, Instant eventTime, String[] keys, String[] tags) { * @param eventTime message eventTime */ public Message(byte[] value, Instant eventTime) { - this(value, eventTime, null, null); + this(value, eventTime, null, null, null); } /** @@ -47,7 +64,7 @@ public Message(byte[] value, Instant eventTime) { * @param keys message keys */ public Message(byte[] value, Instant eventTime, String[] keys) { - this(value, eventTime, keys, null); + this(value, eventTime, keys, null, null); } /** @@ -59,6 +76,6 @@ public Message(byte[] value, Instant eventTime, String[] keys) { * @return returns the Message which will be dropped */ public static Message toDrop(Instant eventTime) { - return new Message(new byte[0], eventTime, null, DROP_TAGS); + return new Message(new byte[0], eventTime, null, DROP_TAGS, null); } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java index bc600a94..1bbf886c 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java @@ -5,6 +5,8 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc; import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; import lombok.Builder; @@ -233,5 +235,7 @@ public static class TestDatum implements Datum { private final Instant eventTime; private final Instant watermark; private final Map headers; + private final UserMetadata userMetadata; + private final SystemMetadata systemMetadata; } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java index 08f67a06..4fd9a7ae 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java @@ -5,6 +5,9 @@ import akka.japi.pf.ReceiveBuilder; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; +import common.MetadataOuterClass; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; import java.time.Instant; @@ -67,7 +70,9 @@ private void processRequest(Sourcetransformer.SourceTransformRequest transformRe Instant.ofEpochSecond( request.getEventTime().getSeconds(), request.getEventTime().getNanos()), - request.getHeadersMap() + request.getHeadersMap(), + new UserMetadata(request.getMetadata()), + new SystemMetadata(request.getMetadata()) ); String[] keys = request.getKeysList().toArray(new String[0]); try { @@ -117,6 +122,9 @@ private Sourcetransformer.SourceTransformResponse buildResponse( == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags(message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags())) + .setMetadata(message.getUserMetadata() + == null ? MetadataOuterClass.Metadata.getDefaultInstance() + : message.getUserMetadata().toProto()) .build()); }); return responseBuilder.setId(ID).build(); diff --git a/src/main/proto/map/v1/map.proto b/src/main/proto/map/v1/map.proto index 82636e3f..e72acf3e 100644 --- a/src/main/proto/map/v1/map.proto +++ b/src/main/proto/map/v1/map.proto @@ -4,6 +4,7 @@ option java_package = "io.numaproj.numaflow.map.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; +import "common/metadata.proto"; package map.v1; @@ -25,6 +26,7 @@ message MapRequest { google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; map headers = 5; + common.Metadata metadata = 6; } Request request = 1; // This ID is used to uniquely identify a map request @@ -56,6 +58,7 @@ message MapResponse { repeated string keys = 1; bytes value = 2; repeated string tags = 3; + common.Metadata metadata = 4; } repeated Result results = 1; // This ID is used to refer the responses to the request it corresponds to. diff --git a/src/main/proto/sink/v1/sink.proto b/src/main/proto/sink/v1/sink.proto index b1484e4a..4fb516b4 100644 --- a/src/main/proto/sink/v1/sink.proto +++ b/src/main/proto/sink/v1/sink.proto @@ -27,6 +27,7 @@ message SinkRequest { google.protobuf.Timestamp watermark = 4; string id = 5; map headers = 6; + common.Metadata metadata = 7; } // Required field indicating the request. Request request = 1; diff --git a/src/main/proto/source/v1/source.proto b/src/main/proto/source/v1/source.proto index a1fbdfeb..2de4add8 100644 --- a/src/main/proto/source/v1/source.proto +++ b/src/main/proto/source/v1/source.proto @@ -4,6 +4,7 @@ option java_package = "io.numaproj.numaflow.source.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; +import "common/metadata.proto"; package source.v1; @@ -82,6 +83,8 @@ message ReadResponse { // Headers are the metadata associated with the datum. // e.g. Kafka and Redis Stream message usually include information about the headers. map headers = 5; + // Metadata is the metadata of the message + common.Metadata metadata = 6; } message Status { // Code to indicate the status of the response. diff --git a/src/main/proto/sourcetransform/v1/sourcetransformer.proto b/src/main/proto/sourcetransform/v1/sourcetransformer.proto index 45b7022f..dda7cc6a 100644 --- a/src/main/proto/sourcetransform/v1/sourcetransformer.proto +++ b/src/main/proto/sourcetransform/v1/sourcetransformer.proto @@ -4,6 +4,7 @@ option java_package = "io.numaproj.numaflow.sourcetransformer.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; +import "common/metadata.proto"; package sourcetransformer.v1; @@ -37,6 +38,7 @@ message SourceTransformRequest { map headers = 5; // This ID is used to uniquely identify a transform request string id = 6; + common.Metadata metadata = 7; } Request request = 1; optional Handshake handshake = 2; @@ -51,6 +53,7 @@ message SourceTransformResponse { bytes value = 2; google.protobuf.Timestamp event_time = 3; repeated string tags = 4; + common.Metadata metadata = 5; } repeated Result results = 1; // This ID is used to refer the responses to the request it corresponds to. diff --git a/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java b/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java index 9b15bcce..6ce76eed 100644 --- a/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java +++ b/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java @@ -1,5 +1,7 @@ package io.numaproj.numaflow.mapper; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import org.junit.Test; import java.time.Instant; @@ -14,7 +16,7 @@ public void testHandlerDatum() { Instant eventTime = Instant.now(); HashMap headers = new HashMap<>(); headers.put("header1", "value1"); - HandlerDatum datum = new HandlerDatum("asdf".getBytes(), watermark, eventTime, headers); + HandlerDatum datum = new HandlerDatum("asdf".getBytes(), watermark, eventTime, headers, new UserMetadata(), new SystemMetadata()); assertEquals(watermark, datum.getWatermark()); assertEquals(eventTime, datum.getEventTime()); assertEquals(headers, datum.getHeaders()); diff --git a/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java b/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java index 343bf394..c95e065f 100644 --- a/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java +++ b/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java @@ -1,7 +1,9 @@ package io.numaproj.numaflow.mapper; +import io.numaproj.numaflow.shared.UserMetadata; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; public class MessageTest { @Test @@ -18,5 +20,10 @@ public void testMessage() { assertArrayEquals(null, message4.getKeys()); String[] drop_tags = {"U+005C__DROP__"}; assertArrayEquals(drop_tags, message4.getTags()); + Message message5 = new Message("asdf".getBytes(), new String[]{"key1"}, new String[]{"tag1"}, new UserMetadata()); + assertArrayEquals("asdf".getBytes(), message5.getValue()); + assertArrayEquals(new String[]{"key1"}, message5.getKeys()); + assertArrayEquals(new String[]{"tag1"}, message5.getTags()); + assertTrue(message5.getUserMetadata().getGroups().isEmpty()); } } diff --git a/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java b/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java index 93a32f8f..46e5300d 100644 --- a/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java @@ -1,19 +1,23 @@ package io.numaproj.numaflow.mapper; import com.google.protobuf.ByteString; +import common.MetadataOuterClass; import io.grpc.ManagedChannel; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.testing.GrpcCleanupRule; import io.numaproj.numaflow.map.v1.MapGrpc; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.UserMetadata; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; @@ -64,6 +68,19 @@ public void testMapperSuccess() { .setHandshake(MapOuterClass.Handshake.newBuilder().setSot(true)) .build(); + // Build user metadata and use it to initialize metadata to be added to the message passed to mapper + Map prevVertexUserMetadata = new HashMap<>(); + prevVertexUserMetadata.put("prev-group", + MetadataOuterClass.KeyValueGroup + .newBuilder() + .putKeyValue("prev-key", ByteString.copyFromUtf8("prev-value")) + .build() + ); + MetadataOuterClass.Metadata metadata = MetadataOuterClass.Metadata + .newBuilder() + .putAllUserMetadata(prevVertexUserMetadata) + .build(); + ByteString inValue = ByteString.copyFromUtf8("invalue"); MapOuterClass.MapRequest inDatum = MapOuterClass.MapRequest .newBuilder() @@ -71,6 +88,7 @@ public void testMapperSuccess() { .newBuilder() .setValue(inValue) .addAllKeys(List.of("test-map-key")) + .setMetadata(metadata) .build()).build(); String[] expectedKeys = new String[]{"test-map-key" + PROCESSED_KEY_SUFFIX}; @@ -106,6 +124,27 @@ public void testMapperSuccess() { assertEquals(Arrays.asList(expectedKeys), response.getResults(0).getKeysList()); assertEquals(Arrays.asList(expectedTags), response.getResults(0).getTagsList()); assertEquals(1, response.getResultsCount()); + // User metadata should be added to the response. + // It should have both the previous metadata and the metadata added by the mapper. + assertEquals(2, response.getResults(0).getMetadata().getUserMetadataMap().size()); + assertEquals("prev-value", + response.getResults(0) + .getMetadata() + .getUserMetadataMap() + .get("prev-group") + .getKeyValueMap() + .get("prev-key") + .toStringUtf8() + ); + assertEquals("udf-value", + response.getResults(0) + .getMetadata() + .getUserMetadataMap() + .get("udf-group") + .getKeyValueMap() + .get("udf-key") + .toStringUtf8() + ); } requestStreamObserver.onCompleted(); @@ -148,13 +187,18 @@ public MessageList processMessage(String[] keys, Datum datum) { .stream(keys) .map(c -> c + PROCESSED_KEY_SUFFIX) .toArray(String[]::new); + + UserMetadata userMetadata = new UserMetadata(datum.getUserMetadata()); + userMetadata.addKV("udf-group", "udf-key", "udf-value".getBytes()); + return MessageList .newBuilder() .addMessage(new Message( (new String(datum.getValue()) + PROCESSED_VALUE_SUFFIX).getBytes(), updatedKeys, - new String[]{"test-tag"})) + new String[]{"test-tag"}, + userMetadata)) .build(); } } diff --git a/src/test/java/io/numaproj/numaflow/shared/SystemMetadataTest.java b/src/test/java/io/numaproj/numaflow/shared/SystemMetadataTest.java new file mode 100644 index 00000000..100b7c44 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/shared/SystemMetadataTest.java @@ -0,0 +1,159 @@ +package io.numaproj.numaflow.shared; + +import com.google.protobuf.ByteString; +import common.MetadataOuterClass; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.*; + +public class SystemMetadataTest { + + @Test + public void testDefaultConstructor() { + SystemMetadata metadata = new SystemMetadata(); + assertNotNull(metadata.getGroups()); + assertTrue(metadata.getGroups().isEmpty()); + } + + @Test + public void testProtoConstructor_withValidMetadata() { + MetadataOuterClass.KeyValueGroup kvGroup1 = MetadataOuterClass.KeyValueGroup.newBuilder() + .putKeyValue("key1", ByteString.copyFromUtf8("value1")) + .build(); + + MetadataOuterClass.KeyValueGroup kvGroup2 = MetadataOuterClass.KeyValueGroup.newBuilder() + .putKeyValue("keyA", ByteString.copyFromUtf8("valueA")) + .build(); + + MetadataOuterClass.Metadata protoMetadata = MetadataOuterClass.Metadata.newBuilder() + .putSysMetadata("group1", kvGroup1) + .putSysMetadata("group2", kvGroup2) + .build(); + + SystemMetadata metadata = new SystemMetadata(protoMetadata); + + assertEquals(2, metadata.getGroups().size()); + assertArrayEquals("value1".getBytes(), metadata.getValue("group1" ,"key1")); + assertArrayEquals("valueA".getBytes(), metadata.getValue("group2", "keyA")); + } + + @Test + public void testProtoConstructor_withNullMetadata() { + SystemMetadata metadata = new SystemMetadata((MetadataOuterClass.Metadata) null); + assertNotNull(metadata.getGroups()); + assertTrue(metadata.getGroups().isEmpty()); + } + + @Test + public void testProtoConstructor_withEmptyMetadata() { + MetadataOuterClass.Metadata protoMetadata = MetadataOuterClass.Metadata.newBuilder().build(); + SystemMetadata metadata = new SystemMetadata(protoMetadata); + assertNotNull(metadata.getGroups()); + assertTrue(metadata.getGroups().isEmpty()); + } + + @Test + public void testGetKeys_withMissingGroup() { + SystemMetadata metadata = new SystemMetadata(); + assertNotNull(metadata.getKeys("missing")); + assertTrue(metadata.getKeys("missing").isEmpty()); + } + + @Test + public void testGetKeys_withExistingGroup() { + MetadataOuterClass.KeyValueGroup kvGroup = MetadataOuterClass.KeyValueGroup.newBuilder() + .putKeyValue("key1", ByteString.copyFromUtf8("value1")) + .putKeyValue("key2", ByteString.copyFromUtf8("value2")) + .build(); + + MetadataOuterClass.Metadata protoMetadata = MetadataOuterClass.Metadata.newBuilder() + .putSysMetadata("group1", kvGroup) + .build(); + + SystemMetadata metadata = new SystemMetadata(protoMetadata); + + List keys = metadata.getKeys("group1"); + assertEquals(2, keys.size()); + assertTrue(keys.contains("key1")); + assertTrue(keys.contains("key2")); + } + + @Test + public void testGetValue_withMissingKey() { + MetadataOuterClass.KeyValueGroup kvGroup = MetadataOuterClass.KeyValueGroup.newBuilder() + .putKeyValue("key1", ByteString.copyFromUtf8("value1")) + .build(); + + MetadataOuterClass.Metadata protoMetadata = MetadataOuterClass.Metadata.newBuilder() + .putSysMetadata("group1", kvGroup) + .build(); + + SystemMetadata metadata = new SystemMetadata(protoMetadata); + + assertNull(metadata.getValue("group1", "missingKey")); + } + + @Test + public void testGetValue_returnsClone() { + MetadataOuterClass.KeyValueGroup kvGroup = MetadataOuterClass.KeyValueGroup.newBuilder() + .putKeyValue("key1", ByteString.copyFromUtf8("value1")) + .build(); + + MetadataOuterClass.Metadata protoMetadata = MetadataOuterClass.Metadata.newBuilder() + .putSysMetadata("group1", kvGroup) + .build(); + + SystemMetadata metadata = new SystemMetadata(protoMetadata); + + byte[] got = metadata.getValue("group1", "key1"); + assertArrayEquals("value1".getBytes(), got); + + got[0] = 'X'; + + assertArrayEquals("value1".getBytes(), metadata.getValue("group1", "key1")); + } + + @Test + public void testGetGroups_returnsCopy_notLiveView() { + MetadataOuterClass.KeyValueGroup kvGroup = MetadataOuterClass.KeyValueGroup.newBuilder() + .putKeyValue("key1", ByteString.copyFromUtf8("value1")) + .build(); + + MetadataOuterClass.Metadata protoMetadata = MetadataOuterClass.Metadata.newBuilder() + .putSysMetadata("group1", kvGroup) + .build(); + + SystemMetadata metadata = new SystemMetadata(protoMetadata); + + List groups = metadata.getGroups(); + assertEquals(1, groups.size()); + + groups.clear(); + + assertEquals(1, metadata.getGroups().size()); + assertTrue(metadata.getGroups().contains("group1")); + } + + @Test + public void testGetKeys_returnsCopy_notLiveView() { + MetadataOuterClass.KeyValueGroup kvGroup = MetadataOuterClass.KeyValueGroup.newBuilder() + .putKeyValue("key1", ByteString.copyFromUtf8("value1")) + .build(); + + MetadataOuterClass.Metadata protoMetadata = MetadataOuterClass.Metadata.newBuilder() + .putSysMetadata("group1", kvGroup) + .build(); + + SystemMetadata metadata = new SystemMetadata(protoMetadata); + + List keys = metadata.getKeys("group1"); + assertEquals(1, keys.size()); + + keys.clear(); + + assertEquals(1, metadata.getKeys("group1").size()); + assertTrue(metadata.getKeys("group1").contains("key1")); + } +} diff --git a/src/test/java/io/numaproj/numaflow/shared/UserMetadataTest.java b/src/test/java/io/numaproj/numaflow/shared/UserMetadataTest.java new file mode 100644 index 00000000..7520973f --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/shared/UserMetadataTest.java @@ -0,0 +1,218 @@ +package io.numaproj.numaflow.shared; + +import com.google.protobuf.ByteString; +import common.MetadataOuterClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +public class UserMetadataTest { + + @Test + public void testDefaultConstructor() { + UserMetadata metadata = new UserMetadata(); + assertNotNull(metadata.getGroups()); + assertTrue(metadata.getGroups().isEmpty()); + + assertNotNull(metadata.getKeys("missing")); + assertTrue(metadata.getKeys("missing").isEmpty()); + + assertNull(metadata.getValue("missing", "missing")); + } + + @Test + public void testProtoConstructor_withNullMetadata() { + UserMetadata metadata = new UserMetadata((MetadataOuterClass.Metadata) null); + assertNotNull(metadata.getGroups()); + assertTrue(metadata.getGroups().isEmpty()); + } + + @Test + public void testProtoConstructor_withEmptyMetadata() { + MetadataOuterClass.Metadata protoMetadata = MetadataOuterClass.Metadata.newBuilder().build(); + UserMetadata metadata = new UserMetadata(protoMetadata); + assertNotNull(metadata.getGroups()); + assertTrue(metadata.getGroups().isEmpty()); + } + + @Test + public void testProtoConstructor_withValidMetadata() { + MetadataOuterClass.KeyValueGroup kvGroup1 = MetadataOuterClass.KeyValueGroup.newBuilder() + .putKeyValue("key1", ByteString.copyFromUtf8("value1")) + .build(); + + MetadataOuterClass.KeyValueGroup kvGroup2 = MetadataOuterClass.KeyValueGroup.newBuilder() + .putKeyValue("keyA", ByteString.copyFromUtf8("valueA")) + .build(); + + MetadataOuterClass.Metadata protoMetadata = MetadataOuterClass.Metadata.newBuilder() + .putUserMetadata("group1", kvGroup1) + .putUserMetadata("group2", kvGroup2) + .build(); + + UserMetadata metadata = new UserMetadata(protoMetadata); + + assertEquals(2, metadata.getGroups().size()); + assertArrayEquals("value1".getBytes(), metadata.getValue("group1", "key1")); + assertArrayEquals("valueA".getBytes(), metadata.getValue("group2", "keyA")); + } + + @Test + public void testCopyConstructor_deepCopy() { + UserMetadata original = new UserMetadata(); + original.addKV("g", "k", "value".getBytes()); + + UserMetadata copy = new UserMetadata(original); + + assertArrayEquals("value".getBytes(), copy.getValue("g", "k")); + + byte[] fromCopy = copy.getValue("g", "k"); + fromCopy[0] = 'X'; + + assertArrayEquals("value".getBytes(), original.getValue("g", "k")); + assertArrayEquals("value".getBytes(), copy.getValue("g", "k")); + } + + @Test + public void testCopyConstructor_withNullInput() { + UserMetadata copy = new UserMetadata((UserMetadata) null); + assertNotNull(copy.getGroups()); + assertTrue(copy.getGroups().isEmpty()); + } + + @Test + public void testToProto_roundTrip() { + UserMetadata metadata = new UserMetadata(); + metadata.addKV("g1", "k1", "v1".getBytes()); + metadata.addKV("g1", "k2", "v2".getBytes()); + metadata.addKV("g2", "ka", "va".getBytes()); + + MetadataOuterClass.Metadata proto = metadata.toProto(); + UserMetadata roundTrip = new UserMetadata(proto); + + assertEquals(metadata.getGroups().size(), roundTrip.getGroups().size()); + assertArrayEquals("v1".getBytes(), roundTrip.getValue("g1", "k1")); + assertArrayEquals("v2".getBytes(), roundTrip.getValue("g1", "k2")); + assertArrayEquals("va".getBytes(), roundTrip.getValue("g2", "ka")); + } + + @Test + public void testAddKV_ignoresNulls() { + UserMetadata metadata = new UserMetadata(); + + metadata.addKV(null, "k", "v".getBytes()); + metadata.addKV("g", null, "v".getBytes()); + metadata.addKV("g", "k", null); + + assertTrue(metadata.getGroups().isEmpty()); + } + + @Test + public void testAddKV_defensiveCopy() { + UserMetadata metadata = new UserMetadata(); + + byte[] value = "value".getBytes(); + metadata.addKV("g", "k", value); + + value[0] = 'X'; + + assertArrayEquals("value".getBytes(), metadata.getValue("g", "k")); + } + + @Test + public void testGetValue_returnsClone() { + UserMetadata metadata = new UserMetadata(); + metadata.addKV("g", "k", "value".getBytes()); + + byte[] got = metadata.getValue("g", "k"); + assertArrayEquals("value".getBytes(), got); + + got[0] = 'X'; + + assertArrayEquals("value".getBytes(), metadata.getValue("g", "k")); + } + + @Test + public void testAddKVs_ignoresNullGroupOrMap() { + UserMetadata metadata = new UserMetadata(); + Map kv = new HashMap<>(); + kv.put("k", "v".getBytes()); + + metadata.addKVs(null, kv); + metadata.addKVs("g", null); + + assertTrue(metadata.getGroups().isEmpty()); + } + + @Test + public void testAddKVs_filtersNullValues() { + UserMetadata metadata = new UserMetadata(); + + Map kv = new HashMap<>(); + kv.put("k1", "v1".getBytes()); + kv.put("k2", null); + + metadata.addKVs("g", kv); + + List keys = metadata.getKeys("g"); + assertEquals(1, keys.size()); + assertTrue(keys.contains("k1")); + assertArrayEquals("v1".getBytes(), metadata.getValue("g", "k1")); + assertNull(metadata.getValue("g", "k2")); + } + + @Test + public void testAddKVs_toExistingGroup() { + UserMetadata metadata = new UserMetadata(); + + // First, add some initial key-value pairs to a group + Map initialKv = new HashMap<>(); + initialKv.put("k1", "v1".getBytes()); + metadata.addKVs("g", initialKv); + + // Verify initial state + assertEquals(1, metadata.getKeys("g").size()); + assertArrayEquals("v1".getBytes(), metadata.getValue("g", "k1")); + + // Now add more key-value pairs to the same existing group + Map additionalKv = new HashMap<>(); + additionalKv.put("k2", "v2".getBytes()); + additionalKv.put("k3", "v3".getBytes()); + metadata.addKVs("g", additionalKv); + + // Verify all keys are present in the group + List keys = metadata.getKeys("g"); + assertEquals(3, keys.size()); + assertTrue(keys.contains("k1")); + assertTrue(keys.contains("k2")); + assertTrue(keys.contains("k3")); + + // Verify all values + assertArrayEquals("v1".getBytes(), metadata.getValue("g", "k1")); + assertArrayEquals("v2".getBytes(), metadata.getValue("g", "k2")); + assertArrayEquals("v3".getBytes(), metadata.getValue("g", "k3")); + } + + @Test + public void testDeleteKeyAndDeleteGroupAndClear() { + UserMetadata metadata = new UserMetadata(); + metadata.addKV("g1", "k1", "v1".getBytes()); + metadata.addKV("g2", "k2", "v2".getBytes()); + + metadata.deleteKey("g1", "k1"); + assertNull(metadata.getValue("g1", "k1")); + + metadata.deleteKey("missingGroup", "any"); // no-op + + metadata.deleteGroup("g2"); + assertTrue(metadata.getKeys("g2").isEmpty()); + assertNull(metadata.getValue("g2", "k2")); + + metadata.clear(); + assertTrue(metadata.getGroups().isEmpty()); + } +} diff --git a/src/test/java/io/numaproj/numaflow/sinker/DatumStreamImplTest.java b/src/test/java/io/numaproj/numaflow/sinker/DatumStreamImplTest.java index 9e941b3d..b9981161 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/DatumStreamImplTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/DatumStreamImplTest.java @@ -1,5 +1,7 @@ package io.numaproj.numaflow.sinker; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import lombok.AllArgsConstructor; import org.junit.Test; @@ -63,6 +65,16 @@ public String getId() { public Map getHeaders() { return null; } + + @Override + public UserMetadata getUserMetadata() { + return null; + } + + @Override + public SystemMetadata getSystemMetadata() { + return null; + } } } diff --git a/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java b/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java index 2097b42f..a463819a 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java @@ -1,5 +1,7 @@ package io.numaproj.numaflow.sinker; +import io.numaproj.numaflow.shared.SystemMetadata; +import io.numaproj.numaflow.shared.UserMetadata; import org.junit.Test; import java.time.Instant; @@ -14,35 +16,35 @@ public class HandlerDatumTest { @Test public void testGetKeys() { String[] keys = {"key1", "key2"}; - HandlerDatum datum = new HandlerDatum(keys, null, null, null, null, null); + HandlerDatum datum = new HandlerDatum(keys, null, null, null, null, null, null, null); assertArrayEquals(keys, datum.getKeys()); } @Test public void testGetValue() { byte[] value = {1, 2, 3}; - HandlerDatum datum = new HandlerDatum(null, value, null, null, null, null); + HandlerDatum datum = new HandlerDatum(null, value, null, null, null, null, null, null); assertArrayEquals(value, datum.getValue()); } @Test public void testGetWatermark() { Instant watermark = Instant.now(); - HandlerDatum datum = new HandlerDatum(null, null, watermark, null, null, null); + HandlerDatum datum = new HandlerDatum(null, null, watermark, null, null, null, null, null); assertEquals(watermark, datum.getWatermark()); } @Test public void testGetEventTime() { Instant eventTime = Instant.now(); - HandlerDatum datum = new HandlerDatum(null, null, null, eventTime, null, null); + HandlerDatum datum = new HandlerDatum(null, null, null, eventTime, null, null, null, null); assertEquals(eventTime, datum.getEventTime()); } @Test public void testGetId() { String id = "test-id"; - HandlerDatum datum = new HandlerDatum(null, null, null, null, id, null); + HandlerDatum datum = new HandlerDatum(null, null, null, null, id, null, null, null); assertEquals(id, datum.getId()); } @@ -50,7 +52,21 @@ public void testGetId() { public void testGetHeaders() { Map headers = new HashMap<>(); headers.put("header1", "value1"); - HandlerDatum datum = new HandlerDatum(null, null, null, null, null, headers); + HandlerDatum datum = new HandlerDatum(null, null, null, null, null, headers, null, null); assertEquals(headers, datum.getHeaders()); } + + @Test + public void testGetUserMetadata() { + UserMetadata userMetadata = new UserMetadata(); + HandlerDatum datum = new HandlerDatum(null, null, null, null, null, null, userMetadata, null); + assertEquals(userMetadata, datum.getUserMetadata()); + } + + @Test + public void testGetSystemMetadata() { + SystemMetadata systemMetadata = new SystemMetadata(); + HandlerDatum datum = new HandlerDatum(null, null, null, null, null, null, null, systemMetadata); + assertEquals(systemMetadata, datum.getSystemMetadata()); + } } diff --git a/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java b/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java index 841cfb85..36626cfd 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java @@ -2,9 +2,12 @@ import com.google.protobuf.ByteString; import common.MetadataOuterClass; +import io.numaproj.numaflow.shared.UserMetadata; import io.numaproj.numaflow.sink.v1.SinkOuterClass; import org.junit.Test; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -16,50 +19,57 @@ public class ResponseTest { @Test public void test_addResponse() { String defaultId = "id"; + // test fallback response creation Response response1 = Response.responseFallback(defaultId); assertEquals(defaultId, response1.getId()); + // test ok response creation Response response2 = Response.responseOK(defaultId); assertEquals(defaultId, response2.getId()); + // test serve response creation Response response3 = Response.responseServe(defaultId, "serve".getBytes()); assertEquals(defaultId, response3.getId()); + // test failure response creation Response response4 = Response.responseFailure(defaultId, "failure"); assertEquals(defaultId, response4.getId()); - HashMap userMetadata = new HashMap<>(); - userMetadata.put("group1", KeyValueGroup.builder().build()); + // test onSuccess response creation with on success message containing user metadata and no keys HashMap kvg1 = new HashMap<>(Map.ofEntries( entry("key1", "val1".getBytes()) )); kvg1.put("key2", null); - userMetadata.put("group2", KeyValueGroup.builder().keyValue(kvg1).build()); - userMetadata.put("group3", null); - Message onSuccessMessage1 = new Message("onSuccessValue".getBytes(), null, userMetadata); + UserMetadata userMetadata = new UserMetadata(); + userMetadata.addKV("group1", "key2", null); + userMetadata.addKVs("group2", kvg1); + userMetadata.addKVs("group3", null); + Message onSuccessMessage1 = new Message("onSuccessValue".getBytes(), null, new UserMetadata(userMetadata)); Response response5 = Response.responseOnSuccess(defaultId, onSuccessMessage1); assertEquals(defaultId, response5.getId()); - assertEquals("", response5.getOnSuccessMessage().getKeys(0)); + assertNull(response5.getOnSuccessMessage().getKeys()); + assertEquals("onSuccessValue", new String(response5.getOnSuccessMessage().getValue(), StandardCharsets.UTF_8)); + assertEquals(userMetadata.toProto(), response5.getOnSuccessMessage().getUserMetadata().toProto()); + // test onSuccess response creation with on success message containing no user metadata and no keys Message onSuccessMessage2 = new Message("onSuccessValue".getBytes(), null, null); Response response6 = Response.responseOnSuccess(defaultId, onSuccessMessage2); assertEquals(defaultId, response6.getId()); - assertEquals(MetadataOuterClass.Metadata.newBuilder() - .putAllUserMetadata(MetadataOuterClass.Metadata - .getDefaultInstance() - .getUserMetadataMap()).build(), - response6.getOnSuccessMessage().getMetadata()); + assertNull(response6.getOnSuccessMessage().getUserMetadata()); - Message onSuccessMessage3 = new Message(null, "key", null); + // test onSuccess response creation with on success message containing keys but no user metadata or value + Message onSuccessMessage3 = new Message(null, new String[] {"key"}, null); Response response7 = Response.responseOnSuccess(defaultId, onSuccessMessage3); assertEquals(defaultId, response7.getId()); - assertEquals(ByteString.copyFrom("".getBytes()), response7.getOnSuccessMessage().getValue()); - assertEquals("key", response7.getOnSuccessMessage().getKeys(0)); + assertNull(response7.getOnSuccessMessage().getValue()); + assertEquals("key", response7.getOnSuccessMessage().getKeys()[0]); - Response response8 = Response.responseOnSuccess(defaultId, (Message) null); + // test onSuccess response creation with no success message + Response response8 = Response.responseOnSuccess(defaultId, null); assertEquals(defaultId, response8.getId()); assertNull(response8.getOnSuccessMessage()); - Response response9 = Response.responseOnSuccess(defaultId, ( SinkOuterClass.SinkResponse.Result.Message) null); + // + Response response9 = Response.responseOnSuccess(defaultId, null); assertNull(response9.getOnSuccessMessage()); } } diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java index 32de9fce..4f499bae 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java @@ -1,19 +1,27 @@ package io.numaproj.numaflow.sinker; +import static java.util.Map.entry; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; +import common.MetadataOuterClass; import io.grpc.ManagedChannel; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.shared.UserMetadata; import io.numaproj.numaflow.sink.v1.SinkGrpc; import io.numaproj.numaflow.sink.v1.SinkOuterClass; + +import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; + import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; @@ -26,6 +34,9 @@ @RunWith(JUnit4.class) public class ServerTest { private static final String processedIdSuffix = "-id-processed"; + private static final String umdGroup = "group"; + private static final String umdKey = "key"; + private static final String umdValue = "value"; @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); private Server server; private ManagedChannel inProcessChannel; @@ -79,7 +90,7 @@ public void sinkerSuccess() { SinkGrpc.newStub(inProcessChannel).sinkFn(outputStreamObserver); String actualId = "sink_test_id"; - String expectedId = actualId + processedIdSuffix; + String expectedId = actualId + umdValue + processedIdSuffix; // Send a handshake request SinkOuterClass.SinkRequest handshakeRequest = @@ -102,11 +113,19 @@ public void sinkerSuccess() { keys = new String[] {"invalid-key"}; } + // create user metadata to simulate it being passed from upstream + UserMetadata userMetadataObj = new UserMetadata(); + userMetadataObj.addKVs(umdGroup, Map.ofEntries( + entry(umdKey, umdValue.getBytes()) + )); + MetadataOuterClass.Metadata metadata = userMetadataObj.toProto(); + SinkOuterClass.SinkRequest.Request request = SinkOuterClass.SinkRequest.Request.newBuilder() .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .setId(actualId) .addAllKeys(List.of(keys)) + .setMetadata(metadata) .build(); SinkOuterClass.SinkRequest sinkRequest = @@ -178,20 +197,22 @@ public ResponseList processMessages(DatumIterator datumIterator) { break; } + String mdValue = new String(datum.getUserMetadata().getValue(umdGroup, umdKey), StandardCharsets.UTF_8); + if (Arrays.equals(datum.getKeys(), new String[] {"invalid-key"})) { builder.addResponse( - Response.responseFailure(datum.getId() + processedIdSuffix, "error message")); + Response.responseFailure(datum.getId() + mdValue + processedIdSuffix, "error message")); } else if (Arrays.equals(datum.getKeys(), new String[] {"fallback-key"})) { builder.addResponse( - Response.responseFallback(datum.getId() + processedIdSuffix)); + Response.responseFallback(datum.getId() + mdValue + processedIdSuffix)); } else if (Arrays.equals(datum.getKeys(), new String[] {"onsuccess-key"})) { builder.addResponse( - Response.responseOnSuccess(datum.getId() + processedIdSuffix, (Message) null)); + Response.responseOnSuccess(datum.getId() + mdValue + processedIdSuffix, (Message) null)); } else if (Arrays.equals(datum.getKeys(), new String[] {"serve-key"})) { builder.addResponse( - Response.responseServe(datum.getId() + processedIdSuffix, "serve message".getBytes())); + Response.responseServe(datum.getId() + mdValue + processedIdSuffix, "serve message".getBytes())); } else { - builder.addResponse(Response.responseOK(datum.getId() + processedIdSuffix)); + builder.addResponse(Response.responseOK(datum.getId() + mdValue + processedIdSuffix)); } } diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java index 14c29b46..6c3f05c2 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java @@ -1,11 +1,13 @@ package io.numaproj.numaflow.sourcer; import com.google.protobuf.Empty; +import common.MetadataOuterClass; import io.grpc.ManagedChannel; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.shared.UserMetadata; import io.numaproj.numaflow.source.v1.SourceGrpc; import io.numaproj.numaflow.source.v1.SourceOuterClass; import org.junit.After; @@ -16,6 +18,7 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -85,6 +88,7 @@ public void TestSourcer() { int count = 0; boolean handshake = false; boolean eot = false; + ArrayList> userMetadataList = new ArrayList<>(); @Override public void onNext(SourceOuterClass.ReadResponse readResponse) { @@ -98,6 +102,7 @@ public void onNext(SourceOuterClass.ReadResponse readResponse) { return; } count++; + userMetadataList.add(readResponse.getResult().getMetadata().getUserMetadataMap()); SourceOuterClass.Offset offset = readResponse.getResult().getOffset(); offsets.add(offset); SourceOuterClass.AckRequest.Request ackRequest = SourceOuterClass.AckRequest @@ -123,6 +128,14 @@ public void onCompleted() { assertEquals(10, count); assertTrue(handshake); assertTrue(eot); + // we should get 10 userMetadata with metadata intact + assertEquals(10, userMetadataList.size()); + assertEquals( + "src-value", + userMetadataList.get(0) + .get("src-group").getKeyValueMap() + .get("src-key").toStringUtf8() + ); } }); @@ -279,11 +292,17 @@ private static class TestSourcer extends Sourcer { public TestSourcer() { Instant eventTime = Instant.ofEpochMilli(1000L); + + // create user metadata + UserMetadata userMetadata = new UserMetadata(); + userMetadata.addKVs("src-group", Map.of("src-key", "src-value".getBytes())); + for (int i = 0; i < 10; i++) { messages.add(new Message( ByteBuffer.allocate(4).putInt(i).array(), new Offset(ByteBuffer.allocate(4).putInt(i).array(), 0), - eventTime + eventTime, + userMetadata )); eventTime = eventTime.plusMillis(1000L); } diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java index 18645482..07066b80 100644 --- a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java @@ -1,10 +1,12 @@ package io.numaproj.numaflow.sourcetransformer; import com.google.protobuf.ByteString; +import common.MetadataOuterClass; import io.grpc.ManagedChannel; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.shared.UserMetadata; import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc; import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; import org.junit.After; @@ -14,7 +16,9 @@ import java.time.Instant; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; @@ -69,11 +73,25 @@ public void testSourceTransformerSuccess() { .build()) .build(); + // Build user metadata and use it to initialize metadata to be added to the message passed to mapper + Map prevVertexUserMetadata = new HashMap<>(); + prevVertexUserMetadata.put("prev-group", + MetadataOuterClass.KeyValueGroup + .newBuilder() + .putKeyValue("prev-key", ByteString.copyFromUtf8("prev-value")) + .build() + ); + MetadataOuterClass.Metadata metadata = MetadataOuterClass.Metadata + .newBuilder() + .putAllUserMetadata(prevVertexUserMetadata) + .build(); + ByteString inValue = ByteString.copyFromUtf8("invalue"); Sourcetransformer.SourceTransformRequest.Request inDatum = Sourcetransformer.SourceTransformRequest.Request .newBuilder() .setValue(inValue) .addAllKeys(List.of("test-st-key")) + .setMetadata(metadata) .build(); Sourcetransformer.SourceTransformRequest request = Sourcetransformer.SourceTransformRequest @@ -112,6 +130,27 @@ public void testSourceTransformerSuccess() { assertEquals(expectedValue, response.getResults(0).getValue()); assertEquals(Arrays.asList(expectedKeys), response.getResults(0).getKeysList()); assertEquals(1, response.getResultsCount()); + // User metadata should be added to the response. + // It should have both the previous metadata and the metadata added by the mapper. + assertEquals(2, response.getResults(0).getMetadata().getUserMetadataMap().size()); + assertEquals("prev-value", + response.getResults(0) + .getMetadata() + .getUserMetadataMap() + .get("prev-group") + .getKeyValueMap() + .get("prev-key") + .toStringUtf8() + ); + assertEquals("st-value", + response.getResults(0) + .getMetadata() + .getUserMetadataMap() + .get("st-group") + .getKeyValueMap() + .get("st-key") + .toStringUtf8() + ); } requestStreamObserver.onCompleted(); @@ -157,6 +196,10 @@ public MessageList processMessage(String[] keys, Datum datum) { .stream(keys) .map(c -> c + PROCESSED_KEY_SUFFIX) .toArray(String[]::new); + + UserMetadata userMetadata = new UserMetadata(datum.getUserMetadata()); + userMetadata.addKV("st-group", "st-key", "st-value".getBytes()); + return MessageList .newBuilder() .addMessage(new Message( @@ -164,7 +207,8 @@ public MessageList processMessage(String[] keys, Datum datum) { + PROCESSED_VALUE_SUFFIX).getBytes(), TEST_EVENT_TIME, updatedKeys, - new String[]{"test-tag"})) + new String[]{"test-tag"}, + userMetadata)) .build(); } }