diff --git a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java index 17803c42034..16dba25b4e3 100644 --- a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java +++ b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java @@ -93,59 +93,67 @@ public InputStream decrypt(InputStream ciphertext) throws IOException { } @Override - public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { try { - return decrypt(underlying.read(bucketName, blobId)); + InputStreamBlob underlyingBlob = underlying.read(bucketName, blobId); + return InputStreamBlob.of(decrypt(underlyingBlob.payload()), underlyingBlob.metadata()); } catch (IOException e) { throw new ObjectStoreIOException("Error reading blob " + blobId.asString(), e); } } @Override - public Publisher readReactive(BucketName bucketName, BlobId blobId) { + public Publisher readReactive(BucketName bucketName, BlobId blobId) { return Mono.from(underlying.readReactive(bucketName, blobId)) - .map(Throwing.function(this::decrypt)); + .map(Throwing.function(inputStreamBlob -> InputStreamBlob.of(decrypt(inputStreamBlob.payload()), inputStreamBlob.metadata()))); } @Override - public Publisher readBytes(BucketName bucketName, BlobId blobId) { + public Publisher readBytes(BucketName bucketName, BlobId blobId) { return Mono.from(underlying.readBytes(bucketName, blobId)) - .map(Throwing.function(bytes -> { - InputStream inputStream = decrypt(new ByteArrayInputStream(bytes)); + .map(Throwing.function(bytesBlob -> { + InputStream inputStream = decrypt(new ByteArrayInputStream(bytesBlob.payload())); try (UnsynchronizedByteArrayOutputStream outputStream = UnsynchronizedByteArrayOutputStream.builder() - .setBufferSize(bytes.length + PBKDF2StreamingAeadFactory.SEGMENT_SIZE) + .setBufferSize(bytesBlob.payload().length + PBKDF2StreamingAeadFactory.SEGMENT_SIZE) .get()) { IOUtils.copy(inputStream, outputStream); - return outputStream.toByteArray(); + return BytesBlob.of(outputStream.toByteArray(), bytesBlob.metadata()); } })); } @Override - public Publisher save(BucketName bucketName, BlobId blobId, byte[] data) { + public Publisher save(BucketName bucketName, BlobId blobId, Blob blob) { + return switch (blob) { + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload(), bytesBlob.metadata()); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload(), inputStreamBlob.metadata()); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload(), byteSourceBlob.metadata()); + }; + } + + private Publisher save(BucketName bucketName, BlobId blobId, byte[] data, BlobMetadata metadata) { Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(blobId); Preconditions.checkNotNull(data); - return save(bucketName, blobId, new ByteArrayInputStream(data)); + return save(bucketName, blobId, new ByteArrayInputStream(data), metadata); } - @Override - public Publisher save(BucketName bucketName, BlobId blobId, InputStream inputStream) { + private Publisher save(BucketName bucketName, BlobId blobId, InputStream inputStream, BlobMetadata metadata) { Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(blobId); Preconditions.checkNotNull(inputStream); return Mono.usingWhen( Mono.fromCallable(() -> encrypt(inputStream)), - pair -> Mono.from(underlying.save(bucketName, blobId, byteSourceWithSize(pair.getLeft().asByteSource(), pair.getRight()))), + pair -> Mono.from(underlying.save(bucketName, blobId, byteSourceWithSize(pair.getLeft().asByteSource(), pair.getRight(), metadata))), Throwing.function(pair -> Mono.fromRunnable(Throwing.runnable(pair.getLeft()::reset)).subscribeOn(Schedulers.boundedElastic()))) .subscribeOn(Schedulers.boundedElastic()) .onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving bytearray", e)); } - private ByteSource byteSourceWithSize(ByteSource byteSource, long size) { - return new ByteSource() { + private ByteSourceBlob byteSourceWithSize(ByteSource byteSource, long size, BlobMetadata metadata) { + return ByteSourceBlob.of(new ByteSource() { @Override public InputStream openStream() throws IOException { return byteSource.openStream(); @@ -160,17 +168,16 @@ public com.google.common.base.Optional sizeIfKnown() { public long size() { return size; } - }; + }, metadata); } - @Override - public Publisher save(BucketName bucketName, BlobId blobId, ByteSource content) { + private Publisher save(BucketName bucketName, BlobId blobId, ByteSource content, BlobMetadata metadata) { Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(blobId); Preconditions.checkNotNull(content); return Mono.using(content::openStream, - in -> Mono.from(save(bucketName, blobId, in)), + in -> Mono.from(save(bucketName, blobId, in, metadata)), Throwing.consumer(InputStream::close)) .subscribeOn(Schedulers.boundedElastic()); } diff --git a/server/blob/blob-aes/src/test/java/org/apache/james/blob/aes/AESBlobStoreDAOTest.java b/server/blob/blob-aes/src/test/java/org/apache/james/blob/aes/AESBlobStoreDAOTest.java index ad58cd0a726..2cf74dee4e7 100644 --- a/server/blob/blob-aes/src/test/java/org/apache/james/blob/aes/AESBlobStoreDAOTest.java +++ b/server/blob/blob-aes/src/test/java/org/apache/james/blob/aes/AESBlobStoreDAOTest.java @@ -24,20 +24,17 @@ import static org.apache.james.blob.api.BlobStoreDAOFixture.TEST_BUCKET_NAME; import static org.assertj.core.api.Assertions.assertThat; -import java.io.ByteArrayInputStream; - import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BlobStoreDAOContract; +import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import com.google.common.io.ByteSource; - import reactor.core.publisher.Mono; -class AESBlobStoreDAOTest implements BlobStoreDAOContract { +class AESBlobStoreDAOTest implements BlobStoreDAOContract, MetadataAwareBlobStoreDAOContract { private static final String SAMPLE_SALT = "c603a7327ee3dcbc031d8d34b1096c605feca5e1"; private static final CryptoConfig CRYPTO_CONFIG = CryptoConfig.builder() .salt(SAMPLE_SALT) @@ -62,25 +59,25 @@ public BlobStoreDAO testee() { void underlyingDataShouldBeEncrypted() { Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); - byte[] bytes = Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] bytes = Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); assertThat(bytes).isNotEqualTo(SHORT_BYTEARRAY); } @Test void underlyingDataShouldBeEncryptedWhenUsingStream() { - Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY))).block(); + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY.asInputStream())).block(); - byte[] bytes = Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] bytes = Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); assertThat(bytes).isNotEqualTo(SHORT_BYTEARRAY); } @Test void underlyingDataShouldBeEncryptedWhenUsingByteSource() { - Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY.asByteSource())).block(); - byte[] bytes = Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] bytes = Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); assertThat(bytes).isNotEqualTo(SHORT_BYTEARRAY); } diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java index 1e11679953c..29b0a6871e8 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java @@ -19,85 +19,234 @@ package org.apache.james.blob.api; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collection; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import org.reactivestreams.Publisher; +import com.google.common.base.CharMatcher; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; +import com.google.common.io.FileBackedOutputStream; public interface BlobStoreDAO { - class ReactiveByteSource { - private final long size; - private final Publisher content; + record BlobMetadataName(String name) { + private static final CharMatcher CHAR_MATCHER = CharMatcher.inRange('a', 'z') + .or(CharMatcher.inRange('A', 'Z')) + .or(CharMatcher.inRange('0', '9')) + .or(CharMatcher.is('-')); - public ReactiveByteSource(long size, Publisher content) { - this.size = size; - this.content = content; + public BlobMetadataName { + Preconditions.checkArgument(CHAR_MATCHER.matchesAllOf(name), "Invalid char in metadata name. Must be a-z,A-Z,0-9 or - got " + name); + Preconditions.checkArgument(name.length() < 128, "Metadata name is too long. Size exceed 128 chars"); + name = name.toLowerCase(Locale.US); + } + } + + record BlobMetadataValue(String value) { + public BlobMetadataValue { + Preconditions.checkArgument(value.length() < 128, "Metadata value is too long. Size exceed 128 chars"); + } + } + + record ContentTransferEncoding(String value) { + public static BlobMetadataName NAME = new BlobMetadataName("content-transfer-encoding"); + public static ContentTransferEncoding ZSTD = new ContentTransferEncoding("zstd"); + + public static ContentTransferEncoding fromValue(BlobMetadataValue value) { + return new ContentTransferEncoding(value.value()); + } + + public ContentTransferEncoding { + Preconditions.checkArgument(value.length() < 128, "ContentTransferEncoding value is too long. Size exceed 128 chars"); + } + + public BlobMetadataValue asValue() { + return new BlobMetadataValue(value); + } + + } + + record BlobMetadata(Map underlyingMap) { + public static BlobMetadata empty() { + return new BlobMetadata(ImmutableMap.of()); + } + + public Optional get(BlobMetadataName name) { + return Optional.ofNullable(underlyingMap.get(name)); + } + + public BlobMetadata withMetadata(BlobMetadataName name, BlobMetadataValue value) { + return new BlobMetadata(ImmutableMap.builder() + .putAll(underlyingMap) + .put(name, value) + .build()); } - public long getSize() { - return size; + public Optional contentTransferEncoding() { + return get(ContentTransferEncoding.NAME).map(ContentTransferEncoding::fromValue); } - public Publisher getContent() { - return content; + public BlobMetadata withContentTransferEncoding(ContentTransferEncoding contentTransferEncoding) { + return withMetadata(ContentTransferEncoding.NAME, contentTransferEncoding.asValue()); } } + sealed interface Blob { + BlobMetadata metadata(); + + // Have the POJOs encode some conversions ? + InputStreamBlob asInputStream() throws IOException; + + BytesBlob asBytes() throws IOException; + + ByteSourceBlob asByteSource() throws IOException; + } + + record BytesBlob(byte[] payload, BlobMetadata metadata) implements Blob { + public static BytesBlob of(byte[] payload) { + return of(payload, BlobMetadata.empty()); + } + + public static BytesBlob of(String payload) { + return of(payload.getBytes(StandardCharsets.UTF_8), BlobMetadata.empty()); + } + + public static BytesBlob of(byte[] payload, BlobMetadata metadata) { + return new BytesBlob(payload, metadata); + } + + @Override + public InputStreamBlob asInputStream() { + return new InputStreamBlob(new ByteArrayInputStream(payload), metadata); + } + + @Override + public BytesBlob asBytes() { + return this; + } + + @Override + public ByteSourceBlob asByteSource() { + return new ByteSourceBlob(ByteSource.wrap(payload), metadata); + } + + @Override + public boolean equals(Object other) { + if (other instanceof BytesBlob(byte[] otherPayload, BlobMetadata otherMetadata)) { + return Arrays.equals(payload, otherPayload) + && metadata.equals(otherMetadata); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(payload), metadata); + } + } + + record InputStreamBlob(InputStream payload, BlobMetadata metadata) implements Blob { + public static InputStreamBlob of(InputStream payload) { + return of(payload, BlobMetadata.empty()); + } + + public static InputStreamBlob of(InputStream payload, BlobMetadata metadata) { + return new InputStreamBlob(payload, metadata); + } + + private static final int FILE_THRESHOLD = 100 * 1024; + + @Override + public InputStreamBlob asInputStream() { + return this; + } + + @Override + public BytesBlob asBytes() throws IOException { + return new BytesBlob(payload.readAllBytes(), metadata); + } + + @Override + public ByteSourceBlob asByteSource() throws IOException { + try (FileBackedOutputStream fileBackedOutputStream = new FileBackedOutputStream(FILE_THRESHOLD)) { + payload.transferTo(fileBackedOutputStream); + return new ByteSourceBlob(fileBackedOutputStream.asByteSource(), metadata); + } + } + } + + record ByteSourceBlob(ByteSource payload, BlobMetadata metadata) implements Blob { + public static ByteSourceBlob of(ByteSource payload) { + return of(payload, BlobMetadata.empty()); + } + + public static ByteSourceBlob of(ByteSource payload, BlobMetadata metadata) { + return new ByteSourceBlob(payload, metadata); + } + + @Override + public InputStreamBlob asInputStream() throws IOException { + return new InputStreamBlob(payload.openStream(), metadata); + } + + @Override + public BytesBlob asBytes() throws IOException { + return new BytesBlob(payload.read(), metadata); + } + + @Override + public ByteSourceBlob asByteSource() { + return this; + } + } + /** - * Reads a Blob based on its BucketName and its BlobId. + * Reads a InputStreamBlob based on its BucketName and its BlobId. * * @throws ObjectNotFoundException when the blobId or the bucket is not found * @throws ObjectStoreIOException when an unexpected IO error occurs */ - InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException; - - Publisher readReactive(BucketName bucketName, BlobId blobId); + InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException; /** - * Reads a Blob based on its BucketName and its BlobId + * Reads reactively a InputStreamBlob based on its BucketName and its BlobId. * - * @return a Mono containing the content of the blob or + * @return a Publisher containing the content and metadata of the blob or * an ObjectNotFoundException in its error channel when the blobId or the bucket is not found - * or an IOObjectStoreException when an unexpected IO error occurs - */ - Publisher readBytes(BucketName bucketName, BlobId blobId); - - - /** - * Save the blob with the provided blob id, and overwrite the previous blob with the same id if it already exists - * The bucket is created if it not already exists. - * This operation should be atomic and isolated - * Two blobs having the same blobId must have the same content - * @return an empty Mono when the save succeed, - * otherwise an IOObjectStoreException in its error channel + * or an ObjectStoreIOException when an unexpected IO error occurs */ - Publisher save(BucketName bucketName, BlobId blobId, byte[] data); + Publisher readReactive(BucketName bucketName, BlobId blobId); /** - * @see #save(BucketName, BlobId, byte[]) + * Reads reactively a BytesBlob based on its BucketName and its BlobId. * - * The InputStream should be closed after the call to this method - */ - Publisher save(BucketName bucketName, BlobId blobId, InputStream inputStream); - - /** - * @see #save(BucketName, BlobId, byte[]) + * @return a Publisher containing the content and metadata of the blob or + * an ObjectNotFoundException in its error channel when the blobId or the bucket is not found + * or an ObjectStoreIOException when an unexpected IO error occurs */ - Publisher save(BucketName bucketName, BlobId blobId, ByteSource content); + Publisher readBytes(BucketName bucketName, BlobId blobId); /** - * @see #save(BucketName, BlobId, byte[]) + * Save the blob with the provided blob id, and overwrite the previous blob with the same id if it already exists. + * The bucket is created if it does not already exist. + * This operation should be atomic and isolated. + * Two blobs having the same blobId must have the same content. * - * The String is stored as UTF-8. + * @return an empty Publisher when the save succeeds, + * otherwise an ObjectStoreIOException in its error channel */ - default Publisher save(BucketName bucketName, BlobId blobId, String data) { - return save(bucketName, blobId, data.getBytes(StandardCharsets.UTF_8)); - } + Publisher save(BucketName bucketName, BlobId blobId, Blob blob); /** * Remove a Blob based on its BucketName and its BlobId. diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobMetadataTest.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobMetadataTest.java new file mode 100644 index 00000000000..3cabf6991a5 --- /dev/null +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobMetadataTest.java @@ -0,0 +1,34 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ***************************************************************/ + +package org.apache.james.blob.api; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +class BlobMetadataTest { + @Test + void blobMetadataNameShouldBeCaseInsensitive() { + assertThat(new BlobStoreDAO.BlobMetadataName("X-Test").name()) + .isEqualTo("x-test"); + assertThat(new BlobStoreDAO.BlobMetadataName("X-Test")) + .isEqualTo(new BlobStoreDAO.BlobMetadataName("x-test")); + } +} \ No newline at end of file diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreDAOFixture.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreDAOFixture.java index ca03d467b4e..447658b8e64 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreDAOFixture.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreDAOFixture.java @@ -29,9 +29,9 @@ public interface BlobStoreDAOFixture { BlobId OTHER_TEST_BLOB_ID = new TestBlobId("other-test-blob-id"); String SHORT_STRING = "toto"; byte[] EMPTY_BYTEARRAY = {}; - byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8); - byte[] ELEVEN_KILOBYTES = Strings.repeat("2103456789\n", 1000).getBytes(StandardCharsets.UTF_8); + BlobStoreDAO.BytesBlob SHORT_BYTEARRAY = BlobStoreDAO.BytesBlob.of(SHORT_STRING.getBytes(StandardCharsets.UTF_8)); + BlobStoreDAO.BytesBlob ELEVEN_KILOBYTES = BlobStoreDAO.BytesBlob.of(Strings.repeat("2103456789\n", 1000)); String TWELVE_MEGABYTES_STRING = Strings.repeat("7893456789\r\n", 1024 * 1024); - byte[] TWELVE_MEGABYTES = TWELVE_MEGABYTES_STRING.getBytes(StandardCharsets.UTF_8); + BlobStoreDAO.BytesBlob TWELVE_MEGABYTES = BlobStoreDAO.BytesBlob.of(TWELVE_MEGABYTES_STRING); } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreDAOContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreDAOContract.java index eb71a37cb1f..65203e0356b 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreDAOContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreDAOContract.java @@ -29,7 +29,6 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.io.ByteArrayInputStream; import java.time.Duration; import org.apache.james.util.concurrency.ConcurrentTestRunner; @@ -57,7 +56,7 @@ default void deleteBucketShouldDeleteExistingBucketWithItsData() { Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block(); - assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read()) + assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).payload().read()) .isInstanceOf(ObjectNotFoundException.class); } @@ -76,7 +75,7 @@ default void deleteBucketShouldBeIdempotent() { default void saveBytesShouldThrowWhenNullBucketName() { BlobStoreDAO store = testee(); - assertThatThrownBy(() -> Mono.from(store.save(null, TEST_BLOB_ID, SHORT_BYTEARRAY)).block()) + assertThatThrownBy(() -> Mono.from(store.save(null, TEST_BLOB_ID, SHORT_BYTEARRAY.asInputStream())).block()) .isInstanceOf(NullPointerException.class); } @@ -84,15 +83,15 @@ default void saveBytesShouldThrowWhenNullBucketName() { default void saveStringShouldThrowWhenNullBucketName() { BlobStoreDAO store = testee(); - assertThatThrownBy(() -> Mono.from(store.save(null, TEST_BLOB_ID, SHORT_STRING)).block()) + assertThatThrownBy(() -> Mono.from(store.save(null, TEST_BLOB_ID, BlobStoreDAO.BytesBlob.of(SHORT_STRING))).block()) .isInstanceOf(NullPointerException.class); } @Test - default void saveInputStreamShouldThrowWhenNullBucketName() { + default void saveInputStreamShouldThrowWhenNullBucketName() throws Exception { BlobStoreDAO store = testee(); - assertThatThrownBy(() -> Mono.from(store.save(null, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY))).block()) + assertThatThrownBy(() -> Mono.from(store.save(null, TEST_BLOB_ID, SHORT_BYTEARRAY.asInputStream())).block()) .isInstanceOf(NullPointerException.class); } @@ -119,7 +118,7 @@ default void readStreamShouldThrowWhenBucketDoesNotExist() { BlobStoreDAO store = testee(); Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); - assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).read()) + assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).payload().read()) .isInstanceOf(ObjectNotFoundException.class); } @@ -140,8 +139,8 @@ default void shouldBeAbleToSaveDataInMultipleBuckets() { Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); Mono.from(store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); - byte[] bytesDefault = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); - byte[] bytesCustom = Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID)).block(); + byte[] bytesDefault = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); + byte[] bytesCustom = Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID)).block().payload(); assertThat(bytesDefault).isEqualTo(bytesCustom); } @@ -155,7 +154,7 @@ default void saveConcurrentlyWithNonPreExistingBucketShouldNotFail() throws Exce Mono.from(store.save( TEST_BUCKET_NAME, new TestBlobId("id-" + threadNumber + step), - SHORT_STRING + threadNumber + step)).block())) + BlobStoreDAO.BytesBlob.of(SHORT_STRING + threadNumber + step))).block())) .threadCount(10) .operationCount(10) .runSuccessfullyWithin(Duration.ofMinutes(1)); diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreDAOContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreDAOContract.java index 90dc41e2453..287be04feb3 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreDAOContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreDAOContract.java @@ -31,7 +31,6 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -82,7 +81,7 @@ default void deleteShouldDeleteExistingBlobData() { Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); - assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read()) + assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).payload().read()) .isInstanceOf(ObjectStoreException.class); } @@ -106,9 +105,9 @@ default void deleteShouldNotDeleteOtherBlobs() { Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); - InputStream read = store.read(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID); + InputStream read = store.read(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID).payload(); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); + assertThat(read).hasSameContentAs(ELEVEN_KILOBYTES.asInputStream().payload()); } @Test @@ -121,9 +120,9 @@ default void deleteSeveralShouldDeleteAll() { Mono.from(store.delete(TEST_BUCKET_NAME, ImmutableList.of(TEST_BLOB_ID, OTHER_TEST_BLOB_ID))).block(); SoftAssertions.assertSoftly(soft -> { - soft.assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read()) + soft.assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).payload().read()) .isInstanceOf(ObjectStoreException.class); - soft.assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID).read()) + soft.assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID).payload().read()) .isInstanceOf(ObjectStoreException.class); }); } @@ -152,14 +151,14 @@ default void deleteShouldThrowWhenNullBucketName() { default void deleteShouldNotDeleteFromOtherBucket() { BlobStoreDAO store = testee(); - Mono.from(store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, "custom")).block(); + Mono.from(store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, BlobStoreDAO.BytesBlob.of("custom"))).block(); Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); Mono.from(store.delete(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID)).block(); - InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).payload(); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY)); + assertThat(read).hasSameContentAs(SHORT_BYTEARRAY.asInputStream().payload()); } @Test @@ -171,9 +170,9 @@ default void deleteShouldNotDeleteFromOtherBucketWhenSameBlobId() { Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); - InputStream read = store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID); + InputStream read = store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).payload(); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY)); + assertThat(read).hasSameContentAs(SHORT_BYTEARRAY.asInputStream().payload()); } @Test @@ -185,7 +184,7 @@ default void readShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() throws ConcurrentTestRunner.builder() .operation(((threadNumber, step) -> { try { - InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).payload(); String string = IOUtils.toString(read, StandardCharsets.UTF_8); if (!string.equals(TWELVE_MEGABYTES_STRING)) { @@ -211,7 +210,7 @@ default void readBytesShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() th ConcurrentTestRunner.builder() .operation(((threadNumber, step) -> { try { - byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); String string = IOUtils.toString(read, StandardCharsets.UTF_8.displayName()); if (!string.equals(TWELVE_MEGABYTES_STRING)) { throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length()); @@ -246,8 +245,8 @@ default Mono checkConcurrentMixedOperation() { return Mono.from(testee().readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)) //assertj is very cpu-intensive, let's compute the assertion only when arrays are different - .filter(bytes -> !Arrays.equals(bytes, TWELVE_MEGABYTES)) - .doOnNext(bytes -> assertThat(bytes).isEqualTo(TWELVE_MEGABYTES)) + .filter(bytes -> !Arrays.equals(bytes.payload(), TWELVE_MEGABYTES.payload())) + .doOnNext(bytes -> assertThat(bytes.payload()).isEqualTo(TWELVE_MEGABYTES.payload())) .onErrorResume(ObjectNotFoundException.class, throwable -> Mono.empty()) .then(); } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetadataAwareBlobStoreDAOContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetadataAwareBlobStoreDAOContract.java new file mode 100644 index 00000000000..b3e9c5667c8 --- /dev/null +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetadataAwareBlobStoreDAOContract.java @@ -0,0 +1,76 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.api; + +import static org.apache.james.blob.api.BlobStoreDAOFixture.TEST_BLOB_ID; +import static org.apache.james.blob.api.BlobStoreDAOFixture.TEST_BUCKET_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +import reactor.core.publisher.Mono; + +public interface MetadataAwareBlobStoreDAOContract { + BlobStoreDAO testee(); + + @Test + default void readBytesShouldPreserveMetadata() { + BlobStoreDAO testee = testee(); + + BlobStoreDAO.BytesBlob bytesBlob = BlobStoreDAO.BytesBlob.of("payload".getBytes(), + BlobStoreDAO.BlobMetadata.empty() + .withMetadata(new BlobStoreDAO.BlobMetadataName("name"), new BlobStoreDAO.BlobMetadataValue("value"))); + + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytesBlob)).block(); + + assertThat(Mono.from(testee.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().metadata().underlyingMap()) + .containsAllEntriesOf(bytesBlob.metadata().underlyingMap()); + } + + @Test + default void readStreamShouldPreserveMetadata() { + BlobStoreDAO testee = testee(); + + BlobStoreDAO.InputStreamBlob inputStreamBlob = BlobStoreDAO.BytesBlob.of("payload".getBytes(), + BlobStoreDAO.BlobMetadata.empty() + .withMetadata(new BlobStoreDAO.BlobMetadataName("name"), new BlobStoreDAO.BlobMetadataValue("value"))) + .asInputStream(); + + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, inputStreamBlob)).block(); + + assertThat(Mono.from(testee.readReactive(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().metadata().underlyingMap()) + .containsAllEntriesOf(inputStreamBlob.metadata().underlyingMap()); + } + + @Test + default void readByteSourceShouldPreserveMetadata() { + BlobStoreDAO testee = testee(); + + BlobStoreDAO.ByteSourceBlob byteSourceBlob = BlobStoreDAO.BytesBlob.of("payload".getBytes(), + BlobStoreDAO.BlobMetadata.empty() + .withMetadata(new BlobStoreDAO.BlobMetadataName("name"), new BlobStoreDAO.BlobMetadataValue("value"))) + .asByteSource(); + + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, byteSourceBlob)).block(); + + assertThat(Mono.from(testee.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().metadata().underlyingMap()) + .containsAllEntriesOf(byteSourceBlob.metadata().underlyingMap()); + } +} diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveBlobStoreDAOContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveBlobStoreDAOContract.java index bf91ac9f2b7..640afb6a18e 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveBlobStoreDAOContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveBlobStoreDAOContract.java @@ -55,44 +55,12 @@ public interface ReadSaveBlobStoreDAOContract { BlobStoreDAO testee(); - @Test - default void saveShouldThrowWhenNullData() { - BlobStoreDAO store = testee(); - - assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (byte[]) null)).block()) - .isInstanceOf(NullPointerException.class); - } - - @Test - default void saveShouldThrowWhenNullString() { - BlobStoreDAO store = testee(); - - assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (String) null)).block()) - .isInstanceOf(NullPointerException.class); - } - - @Test - default void saveShouldThrowWhenNullInputStream() { - BlobStoreDAO store = testee(); - - assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (InputStream) null)).block()) - .isInstanceOf(NullPointerException.class); - } - - @Test - default void saveShouldThrowWhenNullByteSource() { - BlobStoreDAO store = testee(); - - assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (ByteSource) null)).block()) - .isInstanceOf(NullPointerException.class); - } - @Test default void saveShouldSaveEmptyData() { BlobStoreDAO store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, EMPTY_BYTEARRAY)).block(); - byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, BlobStoreDAO.BytesBlob.of(EMPTY_BYTEARRAY))).block(); + byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); assertThat(bytes).isEmpty(); } @@ -101,9 +69,9 @@ default void saveShouldSaveEmptyData() { default void saveShouldSaveEmptyString() { BlobStoreDAO store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, "")).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, BlobStoreDAO.BytesBlob.of(""))).block(); - byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty(); } @@ -112,9 +80,9 @@ default void saveShouldSaveEmptyString() { default void saveShouldSaveEmptyInputStream() { BlobStoreDAO store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(EMPTY_BYTEARRAY))).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, BlobStoreDAO.InputStreamBlob.of(new ByteArrayInputStream(EMPTY_BYTEARRAY)))).block(); - byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); assertThat(bytes).isEmpty(); } @@ -123,9 +91,9 @@ default void saveShouldSaveEmptyInputStream() { default void saveShouldSaveEmptyByteSource() { BlobStoreDAO store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.empty())).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, BlobStoreDAO.ByteSourceBlob.of(ByteSource.empty()))).block(); - byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); assertThat(bytes).isEmpty(); } @@ -144,9 +112,9 @@ default void readBytesShouldReturnSavedData() { Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); - byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); - assertThat(bytes).isEqualTo(SHORT_BYTEARRAY); + assertThat(bytes).isEqualTo(SHORT_BYTEARRAY.payload()); } @Test @@ -155,9 +123,9 @@ default void readBytesShouldReturnLongSavedData() { Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); - byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); - assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES); + assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES.payload()); } @Test @@ -166,16 +134,16 @@ default void readBytesShouldReturnBigSavedData() { Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES)).block(); - byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); - assertThat(bytes).isEqualTo(TWELVE_MEGABYTES); + assertThat(bytes).isEqualTo(TWELVE_MEGABYTES.payload()); } @Test default void readStreamShouldThrowWhenNotExisting() { BlobStoreDAO store = testee(); - assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new TestBlobId("unknown")).read()) + assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new TestBlobId("unknown")).payload().read()) .isInstanceOf(ObjectNotFoundException.class); } @@ -195,9 +163,9 @@ default void readShouldReturnSavedData() { BlobStoreDAO store = testee(); Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); - InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).payload(); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY)); + assertThat(read).hasSameContentAs(SHORT_BYTEARRAY.asInputStream().payload()); } @Test @@ -205,9 +173,9 @@ default void readShouldReturnLongSavedData() { BlobStoreDAO store = testee(); Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); - InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).payload(); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); + assertThat(read).hasSameContentAs(ELEVEN_KILOBYTES.asInputStream().payload()); } @Test @@ -215,88 +183,87 @@ default void readShouldReturnBigSavedData() { BlobStoreDAO store = testee(); Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES)).block(); - InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).payload(); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES)); + assertThat(read).hasSameContentAs(TWELVE_MEGABYTES.asInputStream().payload()); } @ParameterizedTest(name = "[{index}] {0}") @MethodSource("blobs") - default void saveBytesShouldBeIdempotent(String description, byte[] bytes) { + default void saveBytesShouldBeIdempotent(String description, BlobStoreDAO.BytesBlob bytes) { BlobStoreDAO store = testee(); Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block(); Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block(); - byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); - assertThat(read).isEqualTo(bytes); + assertThat(read).isEqualTo(bytes.payload()); } @ParameterizedTest(name = "[{index}] {0}") @MethodSource("blobs") - default void saveByteSourceShouldBeIdempotent(String description, byte[] bytes) { + default void saveByteSourceShouldBeIdempotent(String description, BlobStoreDAO.BytesBlob bytes) { BlobStoreDAO store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes))).block(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes))).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes.asByteSource())).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes.asByteSource())).block(); - byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); - assertThat(read).isEqualTo(bytes); + assertThat(read).isEqualTo(bytes.payload()); } @ParameterizedTest(name = "[{index}] {0}") @MethodSource("blobs") - default void saveInputStreamShouldBeIdempotent(String description, byte[] bytes) { + default void saveInputStreamShouldBeIdempotent(String description, BlobStoreDAO.BytesBlob bytes) { BlobStoreDAO store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes))).block(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes))).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes.asInputStream())).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes.asInputStream())).block(); - byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); - assertThat(read).isEqualTo(bytes); + assertThat(read).isEqualTo(bytes.payload()); } @Test default void saveInputStreamShouldNotOverwritePreviousDataOnFailingInputStream() { BlobStoreDAO store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())) + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, BlobStoreDAO.InputStreamBlob.of(getThrowingInputStream()))) .onErrorResume(throwable -> Mono.empty()).block(); - byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); - assertThat(read).isEqualTo(ELEVEN_KILOBYTES); + assertThat(read).isEqualTo(ELEVEN_KILOBYTES.payload()); } @Test default void saveByteSourceShouldNotOverwritePreviousDataOnFailingInputStream() { BlobStoreDAO store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() { + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, BlobStoreDAO.ByteSourceBlob.of(new ByteSource() { @Override public InputStream openStream() throws IOException { return getThrowingInputStream(); } - })) - .onErrorResume(throwable -> Mono.empty()).block(); + }))).onErrorResume(throwable -> Mono.empty()).block(); - byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block().payload(); - assertThat(read).isEqualTo(ELEVEN_KILOBYTES); + assertThat(read).isEqualTo(ELEVEN_KILOBYTES.payload()); } @Test default void saveByteSourceShouldThrowOnIOException() { BlobStoreDAO store = testee(); - assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() { + assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, BlobStoreDAO.ByteSourceBlob.of(new ByteSource() { @Override public InputStream openStream() throws IOException { return getThrowingInputStream(); } - })).block()) + }))).block()) .isInstanceOf(ObjectStoreIOException.class); } @@ -304,7 +271,8 @@ public InputStream openStream() throws IOException { default void saveInputStreamShouldThrowOnIOException() { BlobStoreDAO store = testee(); - assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())).block()) + assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, + BlobStoreDAO.InputStreamBlob.of(getThrowingInputStream()))).block()) .isInstanceOf(ObjectStoreIOException.class); } @@ -317,8 +285,8 @@ default void listShouldReturnEmptyByDefault() { @Test default void listShouldReturnPresentBlobs() { BlobStoreDAO store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); - Mono.from(store.save(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); assertThat(Flux.from(testee().listBlobs(TEST_BUCKET_NAME)) .map(BlobId::asString) @@ -334,12 +302,12 @@ static Stream blobs() { @ParameterizedTest(name = "[{index}] {0}") @MethodSource(value = "blobs") - default void concurrentSaveBytesShouldReturnConsistentValues(String description, byte[] bytes) throws ExecutionException, InterruptedException { + default void concurrentSaveBytesShouldReturnConsistentValues(String description, BlobStoreDAO.BytesBlob bytes) throws ExecutionException, InterruptedException { Mono.from(testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block(); ConcurrentTestRunner.builder() .randomlyDistributedReactorOperations( (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes), - (threadNumber, step) -> checkConcurrentSaveOperation(bytes) + (threadNumber, step) -> checkConcurrentSaveOperation(bytes.payload()) ) .threadCount(10) .operationCount(20) @@ -348,12 +316,12 @@ default void concurrentSaveBytesShouldReturnConsistentValues(String description, @ParameterizedTest(name = "[{index}] {0}") @MethodSource("blobs") - default void concurrentSaveInputStreamShouldReturnConsistentValues(String description, byte[] bytes) throws ExecutionException, InterruptedException { + default void concurrentSaveInputStreamShouldReturnConsistentValues(String description, BlobStoreDAO.BytesBlob bytes) throws ExecutionException, InterruptedException { Mono.from(testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block(); ConcurrentTestRunner.builder() .randomlyDistributedReactorOperations( - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes)), - (threadNumber, step) -> checkConcurrentSaveOperation(bytes) + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes.asInputStream()), + (threadNumber, step) -> checkConcurrentSaveOperation(bytes.payload()) ) .threadCount(10) .operationCount(20) @@ -362,12 +330,12 @@ default void concurrentSaveInputStreamShouldReturnConsistentValues(String descri @ParameterizedTest(name = "[{index}] {0}") @MethodSource("blobs") - default void concurrentSaveByteSourceShouldReturnConsistentValues(String description, byte[] bytes) throws ExecutionException, InterruptedException { + default void concurrentSaveByteSourceShouldReturnConsistentValues(String description, BlobStoreDAO.BytesBlob bytes) throws ExecutionException, InterruptedException { Mono.from(testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block(); ConcurrentTestRunner.builder() .randomlyDistributedReactorOperations( - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)), - (threadNumber, step) -> checkConcurrentSaveOperation(bytes) + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes), + (threadNumber, step) -> checkConcurrentSaveOperation(bytes.payload()) ) .threadCount(10) .operationCount(20) @@ -376,6 +344,7 @@ default void concurrentSaveByteSourceShouldReturnConsistentValues(String descrip default Mono checkConcurrentSaveOperation(byte[] expected) { return Mono.from(testee().readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)) + .map(BlobStoreDAO.BytesBlob::payload) //assertj is very cpu-intensive, let's compute the assertion only when arrays are different .filter(bytes -> !Arrays.equals(bytes, expected)) .doOnNext(bytes -> assertThat(bytes).isEqualTo(expected)) @@ -383,7 +352,7 @@ default Mono checkConcurrentSaveOperation(byte[] expected) { } default FilterInputStream getThrowingInputStream() { - return new FilterInputStream(new ByteArrayInputStream(TWELVE_MEGABYTES)) { + return new FilterInputStream(TWELVE_MEGABYTES.asInputStream().payload()) { int failingThreshold = 5; int alreadyRead = 0; @@ -412,5 +381,4 @@ public int read(byte[] b, int off, int len) throws IOException { }; } - } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java index 193e2c4dda6..5afd769d41e 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java @@ -97,23 +97,32 @@ public CassandraBlobStoreDAO(CassandraDefaultBucketDAO defaultBucketDAO, } @Override - public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { - return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId)); + public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + return InputStreamBlob.of(ReactorUtils.toInputStream(readBlobParts(bucketName, blobId))); } @Override - public Publisher readReactive(BucketName bucketName, BlobId blobId) { + public Publisher readReactive(BucketName bucketName, BlobId blobId) { return Mono.just(read(bucketName, blobId)); } @Override - public Mono readBytes(BucketName bucketName, BlobId blobId) { + public Publisher readBytes(BucketName bucketName, BlobId blobId) { return readBlobParts(bucketName, blobId) .collectList() - .map(this::byteBuffersToBytesArray); + .map(this::byteBuffersToBytesArray) + .map(BytesBlob::of); } @Override + public Publisher save(BucketName bucketName, BlobId blobId, Blob blob) { + return switch (blob) { + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload()); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload()); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload()); + }; + } + public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { Preconditions.checkNotNull(data); @@ -121,7 +130,6 @@ public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { .flatMap(chunks -> save(bucketName, blobId, chunks)); } - @Override public Mono save(BucketName bucketName, BlobId blobId, InputStream inputStream) { Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(inputStream); @@ -132,7 +140,6 @@ public Mono save(BucketName bucketName, BlobId blobId, InputStream inputSt .onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving input stream", e)); } - @Override public Mono save(BucketName bucketName, BlobId blobId, ByteSource content) { return Mono.using(content::openBufferedStream, stream -> save(bucketName, blobId, stream), diff --git a/server/blob/blob-file/src/main/java/org/apache/james/blob/file/FileBlobStoreDAO.java b/server/blob/blob-file/src/main/java/org/apache/james/blob/file/FileBlobStoreDAO.java index 95b9b2e5e83..4f19207b919 100644 --- a/server/blob/blob-file/src/main/java/org/apache/james/blob/file/FileBlobStoreDAO.java +++ b/server/blob/blob-file/src/main/java/org/apache/james/blob/file/FileBlobStoreDAO.java @@ -65,11 +65,11 @@ public FileBlobStoreDAO(FileSystem fileSystem, BlobId.Factory blobIdFactory) thr } @Override - public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { File bucketRoot = getBucketRoot(bucketName); File blob = new File(bucketRoot, blobId.asString()); try { - return new FileInputStream(blob); + return InputStreamBlob.of(new FileInputStream(blob)); } catch (FileNotFoundException e) { throw new ObjectNotFoundException(String.format("Cannot locate %s within %s", blobId.asString(), bucketName.asString()), e); } @@ -88,22 +88,31 @@ private File getBucketRoot(BucketName bucketName) { } @Override - public Mono readReactive(BucketName bucketName, BlobId blobId) { + public Publisher readReactive(BucketName bucketName, BlobId blobId) { return Mono.fromCallable(() -> read(bucketName, blobId)) .subscribeOn(Schedulers.boundedElastic()); } @Override - public Mono readBytes(BucketName bucketName, BlobId blobId) { + public Publisher readBytes(BucketName bucketName, BlobId blobId) { return Mono.fromCallable(() -> { - File bucketRoot = getBucketRoot(bucketName); - File blob = new File(bucketRoot, blobId.asString()); - return FileUtils.readFileToByteArray(blob); - }).onErrorResume(NoSuchFileException.class, e -> Mono.error(new ObjectNotFoundException(String.format("Cannot locate %s within %s", blobId.asString(), bucketName.asString()), e))) - .subscribeOn(Schedulers.boundedElastic()); + File bucketRoot = getBucketRoot(bucketName); + File blob = new File(bucketRoot, blobId.asString()); + return FileUtils.readFileToByteArray(blob); + }).onErrorResume(NoSuchFileException.class, e -> Mono.error(new ObjectNotFoundException(String.format("Cannot locate %s within %s", blobId.asString(), bucketName.asString()), e))) + .subscribeOn(Schedulers.boundedElastic()) + .map(BytesBlob::of); } @Override + public Publisher save(BucketName bucketName, BlobId blobId, Blob blob) { + return switch (blob) { + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload()); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload()); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload()); + }; + } + public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { Preconditions.checkNotNull(data); @@ -116,7 +125,6 @@ public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { .then(); } - @Override public Mono save(BucketName bucketName, BlobId blobId, InputStream inputStream) { Preconditions.checkNotNull(inputStream); return Mono.fromRunnable(() -> { @@ -158,7 +166,6 @@ private void save(byte[] data, File blob) { } } - @Override public Mono save(BucketName bucketName, BlobId blobId, ByteSource content) { return Mono.fromCallable(() -> { try { diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStoreDAO.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStoreDAO.java index 22e586191d7..41af72aa157 100644 --- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStoreDAO.java +++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStoreDAO.java @@ -19,12 +19,9 @@ package org.apache.james.blob.memory; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.util.Collection; -import org.apache.commons.io.IOUtils; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BucketName; @@ -43,33 +40,57 @@ public class MemoryBlobStoreDAO implements BlobStoreDAO { - private final Table blobs; + private final Table blobs; public MemoryBlobStoreDAO() { blobs = HashBasedTable.create(); } @Override - public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { - return readBytes(bucketName, blobId) - .map(ByteArrayInputStream::new) + public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + return Mono.from(readBytes(bucketName, blobId)) + .map(BytesBlob::asInputStream) .block(); } @Override - public Publisher readReactive(BucketName bucketName, BlobId blobId) { - return readBytes(bucketName, blobId) - .map(ByteArrayInputStream::new); + public Publisher readReactive(BucketName bucketName, BlobId blobId) { + return Mono.from(readBytes(bucketName, blobId)) + .map(BytesBlob::asInputStream); } @Override - public Mono readBytes(BucketName bucketName, BlobId blobId) { + public Publisher readBytes(BucketName bucketName, BlobId blobId) { return Mono.fromCallable(() -> blobs.get(bucketName, blobId)) .switchIfEmpty(Mono.error(() -> new ObjectNotFoundException(String.format("blob '%s' not found in bucket '%s'", blobId.asString(), bucketName.asString())))); } @Override - public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { + public Publisher save(BucketName bucketName, BlobId blobId, Blob blob) { + Preconditions.checkNotNull(blob); + return switch (blob) { + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob); + case InputStreamBlob inputStreamBlob -> Mono.fromCallable(() -> { + try { + return inputStreamBlob.asBytes(); + } catch (IOException e) { + throw new ObjectStoreIOException("IOException occured", e); + } + }) + .flatMap(bytes -> save(bucketName, blobId, bytes)); + case ByteSourceBlob byteSourceBlob -> Mono.fromCallable(() -> { + try { + return byteSourceBlob.asBytes(); + } catch (IOException e) { + throw new ObjectStoreIOException("IOException occured", e); + } + }) + .map(bytes -> checkContentSize(byteSourceBlob.payload(), bytes)) + .flatMap(bytes -> save(bucketName, blobId, bytes)); + }; + } + + public Mono save(BucketName bucketName, BlobId blobId, BytesBlob data) { return Mono.fromRunnable(() -> { synchronized (blobs) { blobs.put(bucketName, blobId, data); @@ -77,36 +98,10 @@ public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { }); } - @Override - public Mono save(BucketName bucketName, BlobId blobId, InputStream inputStream) { - Preconditions.checkNotNull(inputStream); - return Mono.fromCallable(() -> { - try { - return IOUtils.toByteArray(inputStream); - } catch (IOException e) { - throw new ObjectStoreIOException("IOException occured", e); - } - }) - .flatMap(bytes -> save(bucketName, blobId, bytes)); - } - - @Override - public Mono save(BucketName bucketName, BlobId blobId, ByteSource content) { - return Mono.fromCallable(() -> { - try { - return content.read(); - } catch (IOException e) { - throw new ObjectStoreIOException("IOException occured", e); - } - }) - .map(bytes -> checkContentSize(content, bytes)) - .flatMap(bytes -> save(bucketName, blobId, bytes)); - } - - private static byte[] checkContentSize(ByteSource content, byte[] bytes) { + private static BytesBlob checkContentSize(ByteSource content, BytesBlob bytes) { try { long preComputedSize = content.size(); - long realSize = bytes.length; + long realSize = bytes.payload().length; Preconditions.checkArgument(content.size() == realSize, "Difference in size between the pre-computed content can cause other blob stores to fail thus we need to test for alignment. Expecting " + realSize + " but pre-computed size was " + preComputedSize); return bytes; diff --git a/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreDAOTest.java b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreDAOTest.java index 83748a79c5d..e55fad369d9 100644 --- a/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreDAOTest.java +++ b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreDAOTest.java @@ -21,10 +21,11 @@ import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BlobStoreDAOContract; +import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; -class MemoryBlobStoreDAOTest implements BlobStoreDAOContract { +class MemoryBlobStoreDAOTest implements BlobStoreDAOContract, MetadataAwareBlobStoreDAOContract { private MemoryBlobStoreDAO blobStore; diff --git a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java index dd579aae9d3..1eb1a9e4d81 100644 --- a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java +++ b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java @@ -25,7 +25,6 @@ import static org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.SIZE; import static org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.TABLE_NAME; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collection; @@ -44,6 +43,7 @@ import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.api.ObjectStoreIOException; import org.jooq.impl.DSL; +import org.reactivestreams.Publisher; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -63,28 +63,37 @@ public PostgresBlobStoreDAO(PostgresExecutor postgresExecutor, BlobId.Factory bl } @Override - public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { return Mono.from(readReactive(bucketName, blobId)) .block(); } @Override - public Mono readReactive(BucketName bucketName, BlobId blobId) { + public Mono readReactive(BucketName bucketName, BlobId blobId) { return Mono.from(readBytes(bucketName, blobId)) - .map(ByteArrayInputStream::new); + .map(BytesBlob::asInputStream); } @Override - public Mono readBytes(BucketName bucketName, BlobId blobId) { + public Publisher readBytes(BucketName bucketName, BlobId blobId) { return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(DATA) .from(TABLE_NAME) .where(BUCKET_NAME.eq(bucketName.asString())) .and(BLOB_ID.eq(blobId.asString())))) .map(record -> record.get(DATA)) - .switchIfEmpty(Mono.error(() -> new ObjectNotFoundException("Blob " + blobId + " does not exist in bucket " + bucketName))); + .switchIfEmpty(Mono.error(() -> new ObjectNotFoundException("Blob " + blobId + " does not exist in bucket " + bucketName))) + .map(BytesBlob::of); } @Override + public Publisher save(BucketName bucketName, BlobId blobId, Blob blob) { + return switch (blob) { + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload()); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload()); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload()); + }; + } + public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { Preconditions.checkNotNull(data); @@ -100,7 +109,6 @@ public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { .set(SIZE, data.length))); } - @Override public Mono save(BucketName bucketName, BlobId blobId, InputStream inputStream) { Preconditions.checkNotNull(inputStream); @@ -113,7 +121,6 @@ public Mono save(BucketName bucketName, BlobId blobId, InputStream inputSt }).flatMap(bytes -> save(bucketName, blobId, bytes)); } - @Override public Mono save(BucketName bucketName, BlobId blobId, ByteSource content) { return Mono.fromCallable(() -> { try { diff --git a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java index 7399cfcfa39..84ace19075f 100644 --- a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java +++ b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java @@ -22,7 +22,6 @@ import static org.apache.james.blob.api.BlobStoreDAOFixture.TEST_BLOB_ID; import static org.apache.james.blob.api.BlobStoreDAOFixture.TEST_BUCKET_NAME; -import java.io.ByteArrayInputStream; import java.time.Duration; import java.util.concurrent.ExecutionException; @@ -37,8 +36,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import com.google.common.io.ByteSource; - import reactor.core.publisher.Mono; class PostgresBlobStoreDAOTest implements BlobStoreDAOContract { @@ -67,12 +64,12 @@ public void listBucketsShouldReturnBucketsWithNoBlob() { @Override @ParameterizedTest(name = "[{index}] {0}") @MethodSource("blobs") - public void concurrentSaveByteSourceShouldReturnConsistentValues(String description, byte[] bytes) throws ExecutionException, InterruptedException { + public void concurrentSaveByteSourceShouldReturnConsistentValues(String description, BlobStoreDAO.BytesBlob bytes) throws ExecutionException, InterruptedException { Mono.from(testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block(); ConcurrentTestRunner.builder() .randomlyDistributedReactorOperations( - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)), - (threadNumber, step) -> checkConcurrentSaveOperation(bytes) + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes.asByteSource()), + (threadNumber, step) -> checkConcurrentSaveOperation(bytes.payload()) ) .threadCount(5) .operationCount(10) @@ -82,12 +79,12 @@ public void concurrentSaveByteSourceShouldReturnConsistentValues(String descript @Override @ParameterizedTest(name = "[{index}] {0}") @MethodSource("blobs") - public void concurrentSaveInputStreamShouldReturnConsistentValues(String description, byte[] bytes) throws ExecutionException, InterruptedException { + public void concurrentSaveInputStreamShouldReturnConsistentValues(String description, BlobStoreDAO.BytesBlob bytes) throws ExecutionException, InterruptedException { Mono.from(testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block(); ConcurrentTestRunner.builder() .randomlyDistributedReactorOperations( - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes)), - (threadNumber, step) -> checkConcurrentSaveOperation(bytes) + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes.asInputStream()), + (threadNumber, step) -> checkConcurrentSaveOperation(bytes.payload()) ) .threadCount(5) .operationCount(10) @@ -97,12 +94,12 @@ public void concurrentSaveInputStreamShouldReturnConsistentValues(String descrip @Override @ParameterizedTest(name = "[{index}] {0}") @MethodSource(value = "blobs") - public void concurrentSaveBytesShouldReturnConsistentValues(String description, byte[] bytes) throws ExecutionException, InterruptedException { + public void concurrentSaveBytesShouldReturnConsistentValues(String description, BlobStoreDAO.BytesBlob bytes) throws ExecutionException, InterruptedException { Mono.from(testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block(); ConcurrentTestRunner.builder() .randomlyDistributedReactorOperations( (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes), - (threadNumber, step) -> checkConcurrentSaveOperation(bytes) + (threadNumber, step) -> checkConcurrentSaveOperation(bytes.payload()) ) .threadCount(5) .operationCount(10) diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java index 737393b0c96..cf942a986cc 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java @@ -136,25 +136,25 @@ public S3BlobStoreDAO(S3ClientFactory s3ClientFactory, } @Override - public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return ReactorUtils.toInputStream(getObject(resolvedBucketName, blobId) + return InputStreamBlob.of(ReactorUtils.toInputStream(getObject(resolvedBucketName, blobId) .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) .block() - .flux); + .flux)); } @Override - public Publisher readReactive(BucketName bucketName, BlobId blobId) { + public Publisher readReactive(BucketName bucketName, BlobId blobId) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); return getObject(resolvedBucketName, blobId) .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) .publishOn(ReactorUtils.BLOCKING_CALL_WRAPPER) - .map(res -> ReactorUtils.toInputStream(res.flux)); + .map(res -> InputStreamBlob.of(ReactorUtils.toInputStream(res.flux))); } private static class FluxResponse { @@ -207,17 +207,17 @@ public void onStream(SdkPublisher publisher) { .switchIfEmpty(Mono.error(() -> new ObjectStoreIOException("Request was unexpectedly canceled, no GetObjectResponse")))); } - @Override - public Mono readBytes(BucketName bucketName, BlobId blobId) { + public Publisher readBytes(BucketName bucketName, BlobId blobId) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); return getObjectBytes(resolvedBucketName, blobId) - .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) - .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) - .publishOn(Schedulers.parallel()) - .map(BytesWrapper::asByteArrayUnsafe) - .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause); + .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) + .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) + .publishOn(Schedulers.parallel()) + .map(BytesWrapper::asByteArrayUnsafe) + .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause) + .map(BytesBlob::of); } private Mono> getObjectBytes(BucketName bucketName, BlobId blobId) { @@ -255,6 +255,14 @@ private Mono buildGetObjectRequestBuilder(BucketName b } @Override + public Publisher save(BucketName bucketName, BlobId blobId, Blob blob) { + return switch (blob) { + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload()); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload()); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload()); + }; + } + public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); @@ -266,7 +274,6 @@ public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { .then(); } - @Override public Mono save(BucketName bucketName, BlobId blobId, InputStream inputStream) { Preconditions.checkNotNull(inputStream); @@ -285,7 +292,6 @@ private Mono uploadUsingFile(BucketName bucketName, BlobId blobId, InputSt .publishOn(Schedulers.parallel()); } - @Override public Mono save(BucketName bucketName, BlobId blobId, ByteSource content) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java index 6ef690eff86..684bb619714 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java @@ -26,7 +26,6 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Optional; @@ -45,8 +44,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import com.google.common.io.ByteSource; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -102,8 +99,7 @@ void listingManyBlobsShouldSucceedWhenExceedingPageSize() { final int count = 1500; Flux.range(0, count) - .concatMap(i -> store.save(TEST_BUCKET_NAME, new TestBlobId("test-blob-id-" + i), - ByteSource.wrap(ELEVEN_KILOBYTES))) + .concatMap(i -> store.save(TEST_BUCKET_NAME, new TestBlobId("test-blob-id-" + i), ELEVEN_KILOBYTES)) .blockLast(); assertThat(Flux.from(testee().listBlobs(TEST_BUCKET_NAME)).count().block()) @@ -115,11 +111,11 @@ void readShouldNotLeakHttpConnexionsForUnclosedStreams() { BlobStoreDAO store = testee(); TestBlobId blobId = new TestBlobId("id"); - Mono.from(store.save(TEST_BUCKET_NAME, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + Mono.from(store.save(TEST_BUCKET_NAME, blobId, ELEVEN_KILOBYTES)).block(); assertThatCode(() -> IntStream.range(0, 256) .forEach(i -> { - InputStream inputStream = store.read(TEST_BUCKET_NAME, blobId); + InputStream inputStream = store.read(TEST_BUCKET_NAME, blobId).payload(); // Close the stream without reading it try { inputStream.close(); @@ -134,11 +130,11 @@ void readShouldFallbackToDefinedBucketWhenFailingOnDefaultOne() { BlobStoreDAO store = testee(); TestBlobId blobId = new TestBlobId("id"); - Mono.from(store.save(fallbackBucket, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + Mono.from(store.save(fallbackBucket, blobId, ELEVEN_KILOBYTES)).block(); - InputStream read = store.read(BucketName.DEFAULT, blobId); + InputStream read = store.read(BucketName.DEFAULT, blobId).payload(); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); + assertThat(read).hasSameContentAs(ELEVEN_KILOBYTES.asInputStream().payload()); } @Test @@ -146,11 +142,11 @@ void readReactiveShouldFallbackToDefinedBucketWhenFailingOnDefaultOne() { BlobStoreDAO store = testee(); TestBlobId blobId = new TestBlobId("id"); - Mono.from(store.save(fallbackBucket, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + Mono.from(store.save(fallbackBucket, blobId, ELEVEN_KILOBYTES)).block(); - InputStream read = Mono.from(store.readReactive(BucketName.DEFAULT, blobId)).block(); + InputStream read = Mono.from(store.readReactive(BucketName.DEFAULT, blobId)).block().payload(); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); + assertThat(read).hasSameContentAs(ELEVEN_KILOBYTES.asInputStream().payload()); } @Test @@ -158,9 +154,9 @@ void readBytesShouldFallbackToDefinedBucketWhenFailingOnDefaultOne() { BlobStoreDAO store = testee(); TestBlobId blobId = new TestBlobId("id"); - Mono.from(store.save(fallbackBucket, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + Mono.from(store.save(fallbackBucket, blobId, ELEVEN_KILOBYTES)).block(); - byte[] bytes = Mono.from(store.readBytes(BucketName.DEFAULT, blobId)).block(); + BlobStoreDAO.BytesBlob bytes = Mono.from(store.readBytes(BucketName.DEFAULT, blobId)).block(); assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES); } @@ -170,7 +166,7 @@ void shouldNotReadOnFallbackBucketWhenNotReadingOnDefaultOne() { BlobStoreDAO store = testee(); TestBlobId blobId = new TestBlobId("id"); - Mono.from(store.save(TEST_BUCKET_NAME, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + Mono.from(store.save(TEST_BUCKET_NAME, blobId, ELEVEN_KILOBYTES)).block(); assertThatThrownBy(() -> store.read(BucketName.DEFAULT, blobId)) .isExactlyInstanceOf(ObjectNotFoundException.class); @@ -181,9 +177,9 @@ void shouldNotReadReactiveOnFallbackBucketWhenNotReadingOnDefaultOne() { BlobStoreDAO store = testee(); TestBlobId blobId = new TestBlobId("id"); - Mono.from(store.save(TEST_BUCKET_NAME, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + Mono.from(store.save(TEST_BUCKET_NAME, blobId, ELEVEN_KILOBYTES)).block(); - assertThatThrownBy(() -> Mono.from(store.readReactive(BucketName.DEFAULT, blobId)).block()) + assertThatThrownBy(() -> Mono.from(store.readReactive(BucketName.DEFAULT, blobId)).block().payload()) .isExactlyInstanceOf(ObjectNotFoundException.class); } @@ -192,9 +188,9 @@ void shouldNotReadBytesOnFallbackBucketWhenNotReadingOnDefaultOne() { BlobStoreDAO store = testee(); TestBlobId blobId = new TestBlobId("id"); - Mono.from(store.save(TEST_BUCKET_NAME, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + Mono.from(store.save(TEST_BUCKET_NAME, blobId, ELEVEN_KILOBYTES)).block(); - assertThatThrownBy(() -> Mono.from(store.readBytes(BucketName.DEFAULT, blobId)).block()) + assertThatThrownBy(() -> Mono.from(store.readBytes(BucketName.DEFAULT, blobId)).block().payload()) .isExactlyInstanceOf(ObjectNotFoundException.class); } } diff --git a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala index 92dfb7f0059..4b1a13e97d2 100644 --- a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala +++ b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala @@ -25,6 +25,7 @@ import com.google.common.io.{BaseEncoding, ByteSource, FileBackedOutputStream} import jakarta.inject.{Inject, Named} import org.apache.commons.io.IOUtils import org.apache.james.blob.api.BlobStore.BlobIdProvider +import org.apache.james.blob.api.BlobStoreDAO.{ByteSourceBlob, BytesBlob, InputStreamBlob} import org.apache.james.blob.api.{BlobId, BlobStore, BlobStoreDAO, BucketName} import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore.THREAD_SWITCH_THRESHOLD import org.reactivestreams.Publisher @@ -85,7 +86,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, Preconditions.checkNotNull(data) SMono(blobIdProvider.apply(data)) .map(_.getT1) - .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data)) + .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, BytesBlob.of(data))) .`then`(SMono.just(blobId))) } @@ -95,7 +96,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, SMono(blobIdProvider.apply(data)) .map(_.getT1) - .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data)) + .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, ByteSourceBlob.of(data))) .`then`(SMono.just(blobId))) .subscribeOn(Schedulers.boundedElastic()) } @@ -154,7 +155,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, Mono.from(blobIdProvider(data)).subscribeOn(Schedulers.boundedElastic()) .flatMap { tuple => - SMono(blobStoreDAO.save(bucketName, tuple.getT1, tuple.getT2)) + SMono(blobStoreDAO.save(bucketName, tuple.getT1, InputStreamBlob.of(tuple.getT2))) .`then`(SMono.just(tuple.getT1)).asJava() } } @@ -162,19 +163,19 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, override def readBytes(bucketName: BucketName, blobId: BlobId): Publisher[Array[Byte]] = { Preconditions.checkNotNull(bucketName) - blobStoreDAO.readBytes(bucketName, blobId) + SMono(blobStoreDAO.readBytes(bucketName, blobId)).map(_.payload()) } override def read(bucketName: BucketName, blobId: BlobId): InputStream = { Preconditions.checkNotNull(bucketName) - blobStoreDAO.read(bucketName, blobId) + blobStoreDAO.read(bucketName, blobId).payload() } override def readReactive(bucketName: BucketName, blobId: BlobId): Publisher[InputStream] = { Preconditions.checkNotNull(bucketName) - blobStoreDAO.readReactive(bucketName, blobId) + SMono(blobStoreDAO.readReactive(bucketName, blobId)).map(_.payload()) } override def getDefaultBucketName: BucketName = defaultBucketName diff --git a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala index 26de0c5bdee..6bff2134067 100644 --- a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala +++ b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala @@ -19,19 +19,20 @@ package org.apache.james.server.blob.deduplication +import java.io.InputStream +import java.util.UUID + import com.google.common.base.Preconditions import com.google.common.io.ByteSource import jakarta.inject.{Inject, Named} import org.apache.james.blob.api.BlobStore.BlobIdProvider +import org.apache.james.blob.api.BlobStoreDAO.{BlobMetadata, ByteSourceBlob, BytesBlob, InputStreamBlob} import org.apache.james.blob.api.{BlobId, BlobStore, BlobStoreDAO, BucketName} import org.reactivestreams.Publisher import reactor.core.publisher.Flux -import reactor.core.scala.publisher.{SMono, tupleTwo2ScalaTuple2} +import reactor.core.scala.publisher.SMono import reactor.core.scheduler.Schedulers -import reactor.util.function.{Tuple2, Tuples} - -import java.io.{ByteArrayInputStream, InputStream} -import java.util.UUID +import reactor.util.function.Tuples class PassThroughBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, @Named(BlobStore.DEFAULT_BUCKET_NAME_QUALIFIER) defaultBucketName: BucketName, @@ -54,7 +55,7 @@ class PassThroughBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, Preconditions.checkNotNull(data) SMono(blobIdProvider.apply(data)) .map(_.getT1) - .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data)) + .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, BytesBlob.of(data))) .`then`(SMono.just(blobId))) } @@ -64,7 +65,7 @@ class PassThroughBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, SMono(blobIdProvider.apply(data)) .map(_.getT1) - .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data)) + .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, ByteSourceBlob.of(data))) .`then`(SMono.just(blobId))) .subscribeOn(Schedulers.boundedElastic()) } @@ -79,10 +80,11 @@ class PassThroughBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, SMono(blobIdProvider(data)) .subscribeOn(Schedulers.boundedElastic()) - .flatMap { tuple => - SMono(blobStoreDAO.save(bucketName, tuple.getT1, tuple.getT2)) + .flatMap { tuple => { + val blob: InputStreamBlob = new InputStreamBlob(tuple.getT2, BlobMetadata.empty()) + SMono(blobStoreDAO.save(bucketName, tuple.getT1, blob)) .`then`(SMono.just(tuple.getT1)) - } + }} } private def withBlobId: BlobIdProvider[InputStream] = data => @@ -95,19 +97,19 @@ class PassThroughBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, override def readBytes(bucketName: BucketName, blobId: BlobId): Publisher[Array[Byte]] = { Preconditions.checkNotNull(bucketName) - blobStoreDAO.readBytes(bucketName, blobId) + SMono(blobStoreDAO.readBytes(bucketName, blobId)).map(_.payload()) } override def read(bucketName: BucketName, blobId: BlobId): InputStream = { Preconditions.checkNotNull(bucketName) - blobStoreDAO.read(bucketName, blobId) + blobStoreDAO.read(bucketName, blobId).payload() } override def readReactive(bucketName: BucketName, blobId: BlobId): Publisher[InputStream] = { Preconditions.checkNotNull(bucketName) - blobStoreDAO.readReactive(bucketName, blobId) + SMono(blobStoreDAO.readReactive(bucketName, blobId)).map(_.payload()) } override def getDefaultBucketName: BucketName = defaultBucketName diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java index 833577dcf32..a83c9eecc0f 100644 --- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java @@ -66,7 +66,7 @@ public Mono upload(InputStream data, ContentType contentType, Us BlobId blobId = blobIdFactory.of(uploadId.asString()); return Mono.fromCallable(() -> new CountingInputStream(data)) - .flatMap(countingInputStream -> Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, countingInputStream)) + .flatMap(countingInputStream -> Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, BlobStoreDAO.InputStreamBlob.of(countingInputStream))) .thenReturn(countingInputStream)) .map(countingInputStream -> new UploadDAO.UploadRepresentation(uploadId, blobId, contentType, countingInputStream.getCount(), user, clock.instant().truncatedTo(ChronoUnit.MILLIS))) @@ -78,7 +78,7 @@ public Mono upload(InputStream data, ContentType contentType, Us public Mono retrieve(UploadId id, Username user) { return uploadDAO.retrieve(user, id) .flatMap(upload -> Mono.from(blobStoreDAO.readReactive(UPLOAD_BUCKET, upload.getBlobId())) - .map(inputStream -> Upload.from(upload.toUploadMetaData(), () -> inputStream))) + .map(inputStream -> Upload.from(upload.toUploadMetaData(), inputStream::payload))) .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id))); } diff --git a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java index 737f1f8efb2..fbec035da79 100644 --- a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java +++ b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java @@ -74,7 +74,7 @@ public Mono upload(InputStream data, ContentType contentType, Us PostgresUploadDAO uploadDAO = uploadDAOFactory.create(user.getDomainPart()); return Mono.fromCallable(() -> new CountingInputStream(data)) - .flatMap(countingInputStream -> Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, countingInputStream)) + .flatMap(countingInputStream -> Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, BlobStoreDAO.InputStreamBlob.of(countingInputStream))) .thenReturn(countingInputStream)) .map(countingInputStream -> UploadMetaData.from(uploadId, contentType, countingInputStream.getCount(), blobId, clock.instant())) .flatMap(uploadMetaData -> uploadDAO.insert(uploadMetaData, user)); @@ -84,7 +84,7 @@ public Mono upload(InputStream data, ContentType contentType, Us public Mono retrieve(UploadId id, Username user) { return uploadDAOFactory.create(user.getDomainPart()).get(id, user) .flatMap(upload -> Mono.from(blobStoreDAO.readReactive(UPLOAD_BUCKET, upload.blobId())) - .map(inputStream -> Upload.from(upload, () -> inputStream))) + .map(inputStream -> Upload.from(upload, inputStream::payload))) .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id))); } diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java index c76b70c4828..72788eb724f 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java @@ -78,12 +78,12 @@ public Publisher upload(InputStream data, ContentType contentTyp BlobId blobId = blobIdFactory.of(uploadId.asString()); return Mono.fromCallable(() -> new CountingInputStream(data)) - .flatMap(dataAsByte -> Mono.from(blobStoreDAO.save(bucketName, blobId, dataAsByte)) - .thenReturn(dataAsByte)) - .map(dataAsByte -> { + .flatMap(countedData -> Mono.from(blobStoreDAO.save(bucketName, blobId, BlobStoreDAO.InputStreamBlob.of(countedData))) + .then(Mono.fromCallable(countedData::getCount))) + .map(count -> { Instant uploadDate = clock.instant(); - uploadStore.put(uploadId, new ImmutablePair<>(user, UploadMetaData.from(uploadId, contentType, dataAsByte.getCount(), blobId, uploadDate))); - return UploadMetaData.from(uploadId, contentType, dataAsByte.getCount(), blobId, uploadDate); + uploadStore.put(uploadId, new ImmutablePair<>(user, UploadMetaData.from(uploadId, contentType, count, blobId, uploadDate))); + return UploadMetaData.from(uploadId, contentType, count, blobId, uploadDate); }); } @@ -128,6 +128,7 @@ public Publisher deleteByUploadDateBefore(Duration expireDuration) { private Mono retrieveUpload(UploadMetaData uploadMetaData) { return Mono.from(blobStoreDAO.readBytes(bucketName, uploadMetaData.blobId())) + .map(BlobStoreDAO.BytesBlob::payload) .map(content -> Upload.from(uploadMetaData, () -> new ByteArrayInputStream(content))); } } diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala index a4759dade58..2bdb85804e4 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala @@ -230,7 +230,7 @@ SMono(testee.deleteByUploadDateBefore(Duration.ofDays(7))).block(); - assertThatThrownBy(() => blobStoreDAO.read(UPLOAD_BUCKET, blobId)) + assertThatThrownBy(() => blobStoreDAO.read(UPLOAD_BUCKET, blobId).payload()) .isInstanceOf(classOf[ObjectNotFoundException]) } }