Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ private void validateFrameSize(int frameSize) throws IOException {
if (wireFormat != null) {
maxFrameSize = wireFormat.getMaxFrameSize();
}

if (frameSize > maxFrameSize) {
throw IOExceptionSupport.createFrameSizeException(frameSize, maxFrameSize);
}
AmqpWireFormat.validateFrameSize(frameSize, maxFrameSize);
}

public void setWireFormat(AmqpWireFormat wireFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -109,11 +110,7 @@ public Object unmarshal(DataInput dataIn) throws IOException {
return new AmqpHeader(magic, false);
} else {
int size = dataIn.readInt();
if (size > maxFrameSize) {
throw new AmqpProtocolException("Frame size exceeded max frame length.");
} else if (size <= 0) {
throw new AmqpProtocolException("Frame size value was invalid: " + size);
}
validateFrameSize(size, maxFrameSize);
Buffer frame = new Buffer(size);
frame.bigEndianEditor().writeInt(size);
frame.readFrom(dataIn);
Expand Down Expand Up @@ -261,4 +258,14 @@ public int getIdleTimeout() {
public void setIdleTimeout(int idelTimeout) {
this.idelTimeout = idelTimeout;
}

static void validateFrameSize(int frameSize, long maxFrameSize) throws IOException {
if (frameSize < 0) {
throw new AmqpProtocolException("Frame size of " + frameSize + " exceeds the maximum frame configured or supported frame size limit");
} else if (Integer.toUnsignedLong(frameSize) > maxFrameSize) {
throw IOExceptionSupport.createFrameSizeException(frameSize, maxFrameSize);
} else if (Integer.compareUnsigned(frameSize, 8) < 0) {
throw new AmqpProtocolException("Frame size of " + frameSize + " is smaller than the minimally viable frame size value");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -57,6 +59,7 @@ public void onFrame(Object frame) {
frames.add(frame);
}
});
amqpWireFormat.setMaxFrameSize(AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
codec.setWireFormat(amqpWireFormat);
}

Expand Down Expand Up @@ -345,6 +348,49 @@ public void testReadPartialPayload() throws Exception {
assertEquals(2, frames.size());
}

@Test
public void testNegativeFrameSize() throws Exception {
AmqpHeader inputHeader = new AmqpHeader();

DataByteArrayOutputStream output = new DataByteArrayOutputStream();
output.write(inputHeader.getBuffer());
// the value is unsigned so negative means too large
output.writeInt(-100);
output.close();

IOException e = assertThrows(IOException.class, () -> codec.parse(output.toBuffer().toByteBuffer()));
assertTrue(e.getMessage().contains("exceeds the maximum frame configured or supported frame size limit"));
}

@Test
public void testFrameSizeTooSmall() throws Exception {
AmqpHeader inputHeader = new AmqpHeader();

DataByteArrayOutputStream output = new DataByteArrayOutputStream();
output.write(inputHeader.getBuffer());
// less than 8 is too small
output.writeInt(3);
output.close();

IOException e = assertThrows(IOException.class, () -> codec.parse(output.toBuffer().toByteBuffer()));
assertTrue(e.getMessage().contains("is smaller than the minimally viable frame size value"));
}

@Test
public void testFrameSizeTooLarge() throws Exception {
amqpWireFormat.setMaxFrameSize(100);
AmqpHeader inputHeader = new AmqpHeader();

DataByteArrayOutputStream output = new DataByteArrayOutputStream();
output.write(inputHeader.getBuffer());
// less than 300 is larger than 100 maxFrameSize
output.writeInt(300);
output.close();

IOException e = assertThrows(IOException.class, () -> codec.parse(output.toBuffer().toByteBuffer()));
assertTrue(e.getMessage().contains("larger than max allowed"));
}

private void assertHeadersEqual(AmqpHeader expected, AmqpHeader actual) {
assertTrue(expected.getBuffer().equals(actual.getBuffer()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,29 @@
package org.apache.activemq.transport.amqp.protocol;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.transport.amqp.AmqpHeader;
import org.apache.activemq.transport.amqp.AmqpWireFormat;
import org.apache.activemq.transport.amqp.AmqpWireFormat.ResetListener;
import org.apache.activemq.util.ByteSequence;
import org.junit.Before;
import org.junit.Test;

public class AmqpWireFormatTest {

private final AmqpWireFormat wireFormat = new AmqpWireFormat();

@Before
public void setUp() throws Exception {
wireFormat.setMaxFrameSize(AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
}

@Test
public void testWhenSaslNotAllowedNonSaslHeaderIsInvliad() {
wireFormat.setAllowNonSaslConnections(false);
Expand Down Expand Up @@ -78,4 +88,39 @@ public void onProtocolReset() {
wireFormat.resetMagicRead();
assertTrue(reset.get());
}


@Test
public void testNegativeFrameSize() throws Exception {
AmqpHeader inputHeader = new AmqpHeader();
wireFormat.unmarshal(new ByteSequence(inputHeader.getBuffer().toByteArray()));

ByteSequence bs = new ByteSequence(ByteBuffer.allocate(Integer.BYTES).putInt(-100).array());
IOException e = assertThrows(IOException.class, () -> wireFormat.unmarshal(bs));
assertTrue(e.getMessage().contains("exceeds the maximum frame configured or supported frame size limit"));
}

@Test
public void testFrameSizeTooSmall() throws Exception {
AmqpHeader inputHeader = new AmqpHeader();
wireFormat.unmarshal(new ByteSequence(inputHeader.getBuffer().toByteArray()));

// less than 8
ByteSequence bs = new ByteSequence(ByteBuffer.allocate(Integer.BYTES).putInt(3).array());
IOException e = assertThrows(IOException.class, () -> wireFormat.unmarshal(bs));
assertTrue(e.getMessage().contains("is smaller than the minimally viable frame size value"));
}

@Test
public void testFrameSizeTooLarge() throws Exception {
wireFormat.setMaxFrameSize(100);
AmqpHeader inputHeader = new AmqpHeader();
wireFormat.unmarshal(new ByteSequence(inputHeader.getBuffer().toByteArray()));

// size 300 is larger than maxFrameSize
ByteSequence bs = new ByteSequence(ByteBuffer.allocate(Integer.BYTES).putInt(300).array());
IOException e = assertThrows(IOException.class, () -> wireFormat.unmarshal(bs));
assertTrue(e.getMessage().contains("larger than max allowed"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
if (maxFrameSizeEnabled && size > maxFrameSize) {
throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
}
// This will also verify size is not negative
context.setFrameSize(size);
}
return doUnmarshal(bytesIn);
Expand Down Expand Up @@ -286,11 +287,12 @@ public Object unmarshal(DataInput dis) throws IOException {
final var context = new MarshallingContext();
marshallingContext.set(context);

if (!sizePrefixDisabled) {
if (!sizePrefixDisabled) {
int size = dis.readInt();
if (maxFrameSizeEnabled && size > maxFrameSize) {
throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
}
// This will also verify size is not negative
context.setFrameSize(size);
}
return doUnmarshal(dis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,12 @@ protected void processCommand(ByteBuffer plain) throws Exception {
}
}

// This isn't strictly necessary as ByteBuffer.allocate() would also throw the same
// IllegalArgumentException but this provides a better error.
if (nextFrameSize < 0) {
throw new IllegalArgumentException("Frame size value " + nextFrameSize + " may not be negative.");
}

// now we got the data, lets reallocate and store the size for the marshaler.
// if there's more data in plain, then the next call will start processing it.
currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ protected void serviceRead() {
}
}

// This isn't strictly necessary as ByteBuffer.allocateDirect() would also throw the same
// IllegalArgumentException, but this provides a better error. OpenWire
if (nextFrameSize < 0) {
throw new IllegalArgumentException("Frame size value " + nextFrameSize + " may not be negative.");
}

if (nextFrameSize > inputBuffer.capacity()) {
currentBuffer = ByteBuffer.allocateDirect(nextFrameSize);
currentBuffer.putInt(nextFrameSize);
Expand Down
Loading