import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
+import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
final @NonNull FileChannel channel;
final @NonNull JournalSegment<E> segment;
- final @NonNull JournalIndex index;
+ private final @NonNull JournalIndex index;
final @NonNull JournalSerdes namespace;
final int maxSegmentSize;
final int maxEntrySize;
- // FIXME: hide these two fields
- Indexed<E> lastEntry;
- int currentPosition;
+ private Indexed<E> lastEntry;
+ private int currentPosition;
JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
final JournalIndex index, final JournalSerdes namespace) {
* @param entry The entry to append.
* @return The appended indexed entry.
*/
- abstract <T extends E> Indexed<T> append(T entry);
+ final <T extends E> Indexed<T> append(final T entry) {
+ // Store the entry index.
+ final long index = getNextIndex();
+ final int position = currentPosition;
+
+ // Serialize the entry.
+ final int bodyPosition = position + HEADER_BYTES;
+ final int avail = maxSegmentSize - bodyPosition;
+ if (avail < 0) {
+ throw new BufferOverflowException();
+ }
+
+ final var writeLimit = Math.min(avail, maxEntrySize);
+ final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
+ try {
+ namespace.serialize(entry, diskEntry);
+ } catch (KryoException e) {
+ if (writeLimit != maxEntrySize) {
+ // We have not provided enough capacity, signal to roll to next segment
+ throw new BufferOverflowException();
+ }
+
+ // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
+ throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+ }
+
+ final int length = diskEntry.position() - HEADER_BYTES;
+
+ // Compute the checksum for the entry.
+ final var crc32 = new CRC32();
+ crc32.update(diskEntry.flip().position(HEADER_BYTES));
+
+ // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
+ diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+ commitWrite(position, diskEntry.rewind());
+
+ // Update the last entry with the correct index/term/length.
+ final var indexedEntry = new Indexed<E>(index, entry, length);
+ lastEntry = indexedEntry;
+ this.index.index(index, position);
+
+ currentPosition = bodyPosition + length;
+
+ @SuppressWarnings("unchecked")
+ final var ugly = (Indexed<T>) indexedEntry;
+ return ugly;
+ }
+
+ abstract ByteBuffer startWrite(int position, int size);
+
+ abstract void commitWrite(int position, ByteBuffer entry);
/**
* Resets the head of the segment to the given index.
final void truncate(final long index) {
// If the index is greater than or equal to the last index, skip the truncate.
if (index >= getLastIndex()) {
- return;
+ return;
}
// Reset the last entry.
this.index.truncate(index);
if (index < segment.firstIndex()) {
- // Reset the writer to the first entry.
- currentPosition = JournalSegmentDescriptor.BYTES;
+ // Reset the writer to the first entry.
+ currentPosition = JournalSegmentDescriptor.BYTES;
} else {
- // Reset the writer to the given index.
- reset(index);
+ // Reset the writer to the given index.
+ reset(index);
}
// Zero the entry header at current channel position.