diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java index 750cc3418820..d3054e3689a5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java @@ -27,6 +27,8 @@ import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer; import org.apache.beam.sdk.util.RowJson.RowJsonSerializer; import org.apache.beam.sdk.util.RowJsonUtils; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; @Internal @AutoService(PayloadSerializerProvider.class) @@ -38,12 +40,49 @@ public String identifier() { @Override public PayloadSerializer getSerializer(Schema schema, Map tableParams) { - ObjectMapper deserializeMapper = - RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema)); - ObjectMapper serializeMapper = - RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema)); - return PayloadSerializer.of( - row -> RowJsonUtils.rowToJson(serializeMapper, row).getBytes(UTF_8), - bytes -> RowJsonUtils.jsonToRow(deserializeMapper, new String(bytes, UTF_8))); + return new JsonPayloadSerializer(schema); + } + + private static class JsonPayloadSerializer implements PayloadSerializer { + private final Schema schema; + private transient volatile @Nullable ObjectMapper deserializeMapper; + private transient volatile @Nullable ObjectMapper serializeMapper; + + public JsonPayloadSerializer(Schema schema) { + this.schema = schema; + } + + private ObjectMapper getDeserializeMapper() { + if (deserializeMapper == null) { + synchronized (this) { + if (deserializeMapper == null) { + deserializeMapper = + RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema)); + } + } + } + return deserializeMapper; + } + + private ObjectMapper getSerializeMapper() { + if (serializeMapper == null) { + synchronized (this) { + if (serializeMapper == null) { + serializeMapper = RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema)); + } + } + } + return serializeMapper; + } + + @Override + public byte[] serialize(Row row) { + return RowJsonUtils.rowToJson(getSerializeMapper(), row).getBytes(UTF_8); + } + + @Override + public Row deserialize(byte[] bytes) { + return RowJsonUtils.jsonToRow(getDeserializeMapper(), new String(bytes, UTF_8)); + } } }