diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index f053135c4dbd7..0ca787d92412d 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -122,6 +122,13 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck // Checksum calculator for each partition. Empty when shuffle checksum disabled. private final Checksum[] partitionChecksums; + // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to + // be an API to directly transfer bytes from managed memory to the disk writer, we buffer + // data through a byte array. This array does not need to be large enough to hold a single + // record; + // Lazily allocated on the first call to writeSortedFile() and reused across spills. + private byte[] writeBuffer; + ShuffleExternalSorter( TaskMemoryManager memoryManager, BlockManager blockManager, @@ -199,12 +206,6 @@ private void writeSortedFile(boolean isFinalFile) { writeMetricsToUse = new ShuffleWriteMetrics(); } - // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to - // be an API to directly transfer bytes from managed memory to the disk writer, we buffer - // data through a byte array. This array does not need to be large enough to hold a single - // record; - final byte[] writeBuffer = new byte[diskWriteBufferSize]; - // Because this output will be read during shuffle, its compression codec must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more details. @@ -220,6 +221,10 @@ private void writeSortedFile(boolean isFinalFile) { // around this, we pass a dummy no-op serializer. final SerializerInstance ser = DummySerializerInstance.INSTANCE; + if (writeBuffer == null) { + writeBuffer = new byte[diskWriteBufferSize]; + } + int currentPartition = -1; final FileSegment committedSegment; try (DiskBlockObjectWriter writer =