Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck
// Checksum calculator for each partition. Empty when shuffle checksum disabled.
private final Checksum[] partitionChecksums;

// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
// data through a byte array. This array does not need to be large enough to hold a single
// record;
// Lazily allocated on the first call to writeSortedFile() and reused across spills.
private byte[] writeBuffer;

ShuffleExternalSorter(
TaskMemoryManager memoryManager,
BlockManager blockManager,
Expand Down Expand Up @@ -199,12 +206,6 @@ private void writeSortedFile(boolean isFinalFile) {
writeMetricsToUse = new ShuffleWriteMetrics();
}

// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
// data through a byte array. This array does not need to be large enough to hold a single
// record;
final byte[] writeBuffer = new byte[diskWriteBufferSize];

// Because this output will be read during shuffle, its compression codec must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more details.
Expand All @@ -220,6 +221,10 @@ private void writeSortedFile(boolean isFinalFile) {
// around this, we pass a dummy no-op serializer.
final SerializerInstance ser = DummySerializerInstance.INSTANCE;

if (writeBuffer == null) {
writeBuffer = new byte[diskWriteBufferSize];
}

int currentPartition = -1;
final FileSegment committedSegment;
try (DiskBlockObjectWriter writer =
Expand Down