* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
- private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]);
+ private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[ENTRY_HEADER_BYTES]);
private final ByteBuffer memory;
private Indexed<E> lastEntry;
}
private static ByteBuffer allocMemory(int maxEntrySize) {
- final var buf = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
+ final var buf = ByteBuffer.allocate((maxEntrySize + ENTRY_HEADER_BYTES) * 2);
buf.limit(0);
return buf;
}
nextIndex++;
// Update the current position for indexing.
- currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length;
+ currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
memory.position(memory.position() + length);
// Read more bytes from the segment if necessary.
// Serialize the entry.
try {
- namespace.serialize(entry, memory.clear().position(Integer.BYTES + Integer.BYTES));
+ namespace.serialize(entry, memory.clear().position(ENTRY_HEADER_BYTES));
} catch (KryoException e) {
throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
}
memory.flip();
- final int length = memory.limit() - (Integer.BYTES + Integer.BYTES);
+ final int length = memory.limit() - ENTRY_HEADER_BYTES;
// Ensure there's enough space left in the buffer to store the entry.
- if (maxSegmentSize - currentPosition < length + Integer.BYTES + Integer.BYTES) {
+ if (maxSegmentSize - currentPosition < length + ENTRY_HEADER_BYTES) {
throw new BufferOverflowException();
}
// Compute the checksum for the entry.
final CRC32 crc32 = new CRC32();
- crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES));
+ crc32.update(memory.array(), ENTRY_HEADER_BYTES, memory.limit() - ENTRY_HEADER_BYTES);
final long checksum = crc32.getValue();
// Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
this.lastEntry = indexedEntry;
this.index.index(index, (int) currentPosition);
- currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length;
+ currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
return (Indexed<T>) indexedEntry;
}
import org.eclipse.jdt.annotation.Nullable;
abstract sealed class JournalSegmentWriter<E> permits FileChannelJournalSegmentWriter, MappedJournalSegmentWriter {
+ /**
+ * The size of the header, comprising of:
+ * <ul>
+ * <li>32-bit signed entry length</li>
+ * <li>32-bit unsigned CRC32 checksum</li>
+ * </li>
+ */
+ static final int ENTRY_HEADER_BYTES = Integer.BYTES + Integer.BYTES;
+
final @NonNull FileChannel channel;
final @NonNull JournalIndex index;
final @NonNull JournalSerdes namespace;
// Serialize the entry.
int position = buffer.position();
- if (position + Integer.BYTES + Integer.BYTES > buffer.limit()) {
+ if (position + ENTRY_HEADER_BYTES > buffer.limit()) {
throw new BufferOverflowException();
}
- buffer.position(position + Integer.BYTES + Integer.BYTES);
+ buffer.position(position + ENTRY_HEADER_BYTES);
try {
namespace.serialize(entry, buffer);
throw new BufferOverflowException();
}
- final int length = buffer.position() - (position + Integer.BYTES + Integer.BYTES);
+ final int length = buffer.position() - (position + ENTRY_HEADER_BYTES);
// If the entry length exceeds the maximum entry size then throw an exception.
if (length > maxEntrySize) {
// Compute the checksum for the entry.
final CRC32 crc32 = new CRC32();
- buffer.position(position + Integer.BYTES + Integer.BYTES);
+ buffer.position(position + ENTRY_HEADER_BYTES);
ByteBuffer slice = buffer.slice();
slice.limit(length);
crc32.update(slice);
buffer.position(position);
buffer.putInt(length);
buffer.putInt((int) checksum);
- buffer.position(position + Integer.BYTES + Integer.BYTES + length);
+ buffer.position(position + ENTRY_HEADER_BYTES + length);
// Update the last entry with the correct index/term/length.
Indexed<E> indexedEntry = new Indexed<>(index, entry, length);