import io.atomix.storage.journal.StorageException.TooLarge;
import io.atomix.storage.journal.index.JournalIndex;
-import io.atomix.storage.journal.index.Position;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
import java.nio.MappedByteBuffer;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
/**
* Tries to append a binary data to the journal.
*
- * @param buf binary data to append
- * @return The index of appended data, or {@code null} if segment has no space
+ * @param mapper the mapper to use
+ * @param entry the entry
+ * @return the entry size, or {@code null} if segment has no space
*/
- Position append(final ByteBuf buf) {
- final var length = buf.readableBytes();
- if (length > maxEntrySize) {
- throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
-
- // Store the entry index.
+ <T> @Nullable Integer append(final ByteBufMapper<T> mapper, final T entry) {
+ // we are appending at this index and position
final long index = nextIndex();
final int position = currentPosition;
- // check space available
- final int nextPosition = position + HEADER_BYTES + length;
- if (nextPosition >= maxSegmentSize) {
+ // Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are
+ // way smaller than that.
+ final int bodyPosition = position + HEADER_BYTES;
+ final int avail = maxSegmentSize - bodyPosition;
+ if (avail <= 0) {
+ // we do not have enough space for the header and a byte: signal a retry
LOG.trace("Not enough space for {} at {}", index, position);
return null;
}
- // allocate buffer and write data
- final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
- writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
+ // Entry must not exceed maxEntrySize
+ final var writeLimit = Math.min(avail, maxEntrySize);
+
+ // Allocate entry space
+ final var diskEntry = fileWriter.startWrite(position, writeLimit + HEADER_BYTES);
+ // Create a ByteBuf covering the bytes. Note we do not use slice(), as Netty will do the equivalent.
+ final var bytes = Unpooled.wrappedBuffer(diskEntry.position(HEADER_BYTES));
+ try {
+ mapper.objectToBytes(entry, bytes);
+ } catch (IOException e) {
+ // We ran out of buffer space: let's decide who's fault it is:
+ if (writeLimit == maxEntrySize) {
+ // - it is the entry and/or mapper. This is not exactly accurate, as there may be other serialization
+ // fault. This is as good as it gets.
+ throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
+ }
+
+ // - it is us, as we do not have the capacity to hold maxEntrySize bytes
+ LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
+ return null;
+ }
- // Compute the checksum for the entry.
- final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
+ // Determine length, trim distEntry and compute checksum. We are okay with computeChecksum() consuming
+ // the buffer, as we rewind() it back.
+ final var length = bytes.readableBytes();
+ final var checksum = SegmentEntry.computeChecksum(
+ diskEntry.limit(HEADER_BYTES + length).position(HEADER_BYTES));
- // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
+ // update the header and commit entry to file
+ fileWriter.commitWrite(position, diskEntry.rewind().putInt(0, length).putInt(Integer.BYTES, checksum));
// Update the last entry with the correct index/term/length.
- currentPosition = nextPosition;
- return journalIndex.index(index, position);
+ currentPosition = bodyPosition + length;
+ journalIndex.index(index, position);
+ return length;
}
/**