0

I have bunch of keys and values that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue. I have header and then followed by data.

Packet class:

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.getDatacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    // below line throws "BufferOverflowException"
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

In the above code, I am getting java.nio.BufferOverflowException at buffer.put(itemBuffer); in sendData method. I am not able to understand why I am getting this exception and how to fix it.

Here is how I am calling this code:

Packet packet = new Packet(partition);
packet.addAndSendJunked("hello".getBytes(StandardCharsets.UTF_8), StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8));
packet.close();
2
  • it means your buffer has less size than the itemBuffer Commented Mar 14, 2018 at 4:28
  • 1
    you should be able to set a breakpoint on that line and see the sizes for both Commented Mar 14, 2018 at 4:29

1 Answer 1

1

Clearly itemBuffer plus the length of the headers already put into buffer exceeds buffer.capacity(), i.e. MAX_SIZE.

Ergo either there is a bug in addHeader() or whatever creates itemBuffer, or MAX_SIZE is too small.

NB

SendRecord.getInstance().sendToQueueAsync(address, buffer.array());

Here you are enqueueing the entire buffer, of MAX_SIZE bytes, even though you may not have put MAX_SIZE bytes into it. It would be better to pass buffer itself to this method and avoid the ceaseless wraps in your code.

Sign up to request clarification or add additional context in comments.

2 Comments

I think we need to flip "itemBuffer" before putting it into "buffer" after analyzing it more. Also how can I extract only what's been filled instead of sending everything to "sendToQueueAsync" method?
@user1950349 That is correct: flip() must precede get() or write(), or put() into another buffer. I've already answered your last question: don't.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.