Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapper/Datum.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.numaproj.numaflow.mapper;


import io.numaproj.numaflow.shared.SystemMetadata;
import io.numaproj.numaflow.shared.UserMetadata;

import java.time.Instant;
import java.util.Map;

Expand Down Expand Up @@ -36,4 +39,19 @@ public interface Datum {
* @return returns the headers in the form of key value pair
*/
Map<String, String> getHeaders();

/**
* method to get the metadata information added by the user.
* It can be appended to and passed downstream.
*
* @return returns the UserMetadata object
*/
UserMetadata getUserMetadata();

/**
* method to get the read-only system metadata information
*
* @return returns the SystemMetadata object
*/
SystemMetadata getSystemMetadata();
}
14 changes: 13 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.numaproj.numaflow.mapper;


import io.numaproj.numaflow.shared.SystemMetadata;
import io.numaproj.numaflow.shared.UserMetadata;
import lombok.AllArgsConstructor;

import java.time.Instant;
Expand All @@ -13,7 +15,8 @@ class HandlerDatum implements Datum {
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;

private UserMetadata userMetadata;
private SystemMetadata systemMetadata;

@Override
public Instant getWatermark() {
Expand All @@ -35,4 +38,13 @@ public Map<String, String> getHeaders() {
return this.headers;
}

@Override
public UserMetadata getUserMetadata() {
return this.userMetadata;
}

@Override
public SystemMetadata getSystemMetadata() {
return this.systemMetadata;
}
}
11 changes: 9 additions & 2 deletions src/main/java/io/numaproj/numaflow/mapper/MapperActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.ByteString;
import common.MetadataOuterClass;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
import io.numaproj.numaflow.shared.SystemMetadata;
import io.numaproj.numaflow.shared.UserMetadata;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;

/**
* Mapper actor that processes the map request. It invokes the mapper to process the request and
Expand Down Expand Up @@ -48,7 +51,9 @@ private void processRequest(MapOuterClass.MapRequest mapRequest) {
Instant.ofEpochSecond(
mapRequest.getRequest().getEventTime().getSeconds(),
mapRequest.getRequest().getEventTime().getNanos()),
mapRequest.getRequest().getHeadersMap()
mapRequest.getRequest().getHeadersMap(),
new UserMetadata(mapRequest.getRequest().getMetadata()),
new SystemMetadata(mapRequest.getRequest().getMetadata())
);
String[] keys = mapRequest.getRequest().getKeysList().toArray(new String[0]);
try {
Expand Down Expand Up @@ -89,6 +94,8 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.setMetadata(message.getUserMetadata()
== null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto())
.build());
});
return responseBuilder.setId(ID).build();
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import io.numaproj.numaflow.shared.SystemMetadata;
import io.numaproj.numaflow.shared.UserMetadata;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -233,5 +235,7 @@ public static class TestDatum implements Datum {
private final Instant eventTime;
private final Instant watermark;
private final Map<String, String> headers;
private final UserMetadata userMetadata;
private final SystemMetadata systemMetadata;
}
}
30 changes: 25 additions & 5 deletions src/main/java/io/numaproj/numaflow/mapper/Message.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
package io.numaproj.numaflow.mapper;

import io.numaproj.numaflow.shared.UserMetadata;
import lombok.Getter;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/** Message is used to wrap the data returned by Mapper. */
@Getter
public class Message {
private static final String[] DROP_TAGS = {"U+005C__DROP__"};
private final String[] keys;
private final byte[] value;
private final String[] tags;
private final UserMetadata userMetadata;

/**
* used to create Message with value, keys and tags(used for conditional forwarding)
* used to create Message with value, keys, tags(used for conditional forwarding) and userMetadata
*
* @param value message value
* @param keys message keys
* @param tags message tags which will be used for conditional forwarding
* @param userMetadata user metadata, this is used to pass user defined metadata to the next vertex
*/
public Message(byte[] value, String[] keys, String[] tags) {
public Message(byte[] value, String[] keys, String[] tags, UserMetadata userMetadata) {
// defensive copy - once the Message is created, the caller should not be able to modify it.
this.keys = keys == null ? null : keys.clone();
this.value = value == null ? null : value.clone();
this.tags = tags == null ? null : tags.clone();
// Copy the data using copy constructor to prevent mutation
this.userMetadata = userMetadata == null ? null : new UserMetadata(userMetadata);
}

/**
Expand All @@ -30,7 +39,7 @@ public Message(byte[] value, String[] keys, String[] tags) {
* @param value message value
*/
public Message(byte[] value) {
this(value, null, null);
this(value, null, null, null);
}

/**
Expand All @@ -40,7 +49,18 @@ public Message(byte[] value) {
* @param keys message keys
*/
public Message(byte[] value, String[] keys) {
this(value, keys, null);
this(value, keys, null, null);
}

/**
* used to create Message with value, keys and tags(used for conditional forwarding)
*
* @param value message value
* @param keys message keys
* @param tags message tags which will be used for conditional forwarding
*/
public Message(byte[] value, String[] keys, String[] tags) {
this(value, keys, tags, null);
}

/**
Expand All @@ -49,6 +69,6 @@ public Message(byte[] value, String[] keys) {
* @return returns the Message which will be dropped
*/
public static Message toDrop() {
return new Message(new byte[0], null, DROP_TAGS);
return new Message(new byte[0], null, DROP_TAGS, null);
}
}
137 changes: 137 additions & 0 deletions src/main/java/io/numaproj/numaflow/shared/SystemMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package io.numaproj.numaflow.shared;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import common.MetadataOuterClass;
import lombok.Getter;

/**
* SystemMetadata is mapping of group name to key-value pairs
* SystemMetadata wraps system-generated metadata groups per message.
* It is read-only to UDFs
*/
@Getter
public class SystemMetadata {
private final Map<String, Map<String, byte[]>> data;

/**
* Default constructor
*/
public SystemMetadata() {
this.data = new HashMap<>();
}

/**
* An all args constructor that filters out null values and copies the values to prevent mutation.
* If the data is null or empty, it initializes an empty HashMap data.
* For each entry in {@code data}, it creates a new HashMap and copies the key-value pairs
* from the entry to the new HashMap. It also filters out any null values.
* Empty hashmap values are allowed as entries in the data map.
*
* @param data is a map of group name to key-value pairs
*/
public SystemMetadata(Map<String, Map<String, byte[]>> data) {
if (data == null || data.isEmpty()) {
this.data = new HashMap<>();
return;
}
this.data = data.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> entry.getValue().entrySet().stream()
.filter(e1 -> e1.getValue() != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
e1-> e1.getValue().clone()
))
));
}

/**
* Constructor from MetadataOuterClass.Metadata
*
* @param metadata is an instance of MetadataOuterClass.Metadata which contains system metadata
*/
public SystemMetadata(MetadataOuterClass.Metadata metadata) {
if (metadata == null || metadata.getSysMetadataMap().isEmpty()) {
this.data = new HashMap<>();
return;
}
this.data = metadata.getSysMetadataMap()
.entrySet().stream()
// No null checks here as protobuf contract ensures that the data has no null values
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> new HashMap<>(e.getValue()
.getKeyValueMap()
.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e1 -> e1.getValue().toByteArray()
))
)
));
}

/**
* Get the data as a map.
* Returns a deep copy of the data to prevent mutation.
*
* @return a deep copy of the data
*/
public Map<String, Map<String, byte[]>> getData() {
// Deep copy the data to prevent mutation
return this.data.entrySet().stream()
// No null checks required as the constructor ensures that the data is valid
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> entry.getValue().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e1-> e1.getValue().clone()
))
));
}

/**
* Get the list of all groups present in the user metadata
*
* @return list of group names
*/
public List<String> getGroups() {
return new ArrayList<>(this.data.keySet());
}

/**
* Get a list of key names within a given group
*
* @param group is the name of the group from which to get the key names
* @return a list of key names within the group
*/
public List<String> getKeys(String group) {
if (!this.data.containsKey(group)) {
return new ArrayList<>();
}
return new ArrayList<>(this.data.get(group).keySet());
}

/**
* Get the value of a key in a group
*
* @param group Name of the group which contains the key holding required value
* @param key Name of the key in the group for which value is required
* @return Value of the key in the group or null if the group/key is not present
*/
public byte[] getValue(String group, String key) {
Map<String, byte[]> groupData = this.data.get(group);
if (groupData == null) {
return null;
}
byte[] value = groupData.get(key);
return value == null ? null : value.clone();
}
}
Loading
Loading