import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
-import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
+import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
+abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
final @NonNull FileChannel channel;
- final @NonNull JournalSegment<E> segment;
+ final @NonNull JournalSegment segment;
private final @NonNull JournalIndex index;
- final @NonNull JournalSerdes namespace;
final int maxSegmentSize;
final int maxEntrySize;
- private Indexed<E> lastEntry;
private int currentPosition;
+ private Long lastIndex;
+ private ByteBuf lastWritten;
- JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalIndex index, final JournalSerdes namespace) {
+ JournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+ final JournalIndex index) {
this.channel = requireNonNull(channel);
this.segment = requireNonNull(segment);
this.index = requireNonNull(index);
- this.namespace = requireNonNull(namespace);
maxSegmentSize = segment.descriptor().maxSegmentSize();
this.maxEntrySize = maxEntrySize;
}
- JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+ JournalSegmentWriter(final JournalSegmentWriter previous) {
channel = previous.channel;
segment = previous.segment;
index = previous.index;
- namespace = previous.namespace;
maxSegmentSize = previous.maxSegmentSize;
maxEntrySize = previous.maxEntrySize;
- lastEntry = previous.lastEntry;
+ lastWritten = previous.lastWritten;
+ lastIndex = previous.lastIndex;
currentPosition = previous.currentPosition;
}
* @return The last written index.
*/
final long getLastIndex() {
- return lastEntry != null ? lastEntry.index() : segment.firstIndex() - 1;
+ return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
}
/**
- * Returns the last entry written.
+ * Returns the last data written.
*
- * @return The last entry written.
+ * @return The last data written.
*/
- final Indexed<E> getLastEntry() {
- return lastEntry;
+ final ByteBuf getLastWritten() {
+ return lastWritten == null ? null : lastWritten.slice();
}
/**
* @return The next index to be written.
*/
final long getNextIndex() {
- return lastEntry != null ? lastEntry.index() + 1 : segment.firstIndex();
+ return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
}
/**
- * Tries to append an entry to the journal.
+ * Tries to append a binary data to the journal.
*
- * @param entry The entry to append.
- * @return The appended indexed entry, or {@code null} if there is not enough space available
+ * @param buf binary data to append
+ * @return The index of appended data, or {@code null} if segment has no space
*/
- final <T extends E> @Nullable Indexed<T> append(final T entry) {
+ final Long append(final ByteBuf buf) {
+ final var length = buf.readableBytes();
+ if (length > maxEntrySize) {
+ throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
+ + maxEntrySize + ")");
+ }
+
// Store the entry index.
final long index = getNextIndex();
final int position = currentPosition;
- // Serialize the entry.
- final int bodyPosition = position + HEADER_BYTES;
- final int avail = maxSegmentSize - bodyPosition;
- if (avail < 0) {
+ // check space available
+ final int nextPosition = position + HEADER_BYTES + length;
+ if (nextPosition >= maxSegmentSize) {
LOG.trace("Not enough space for {} at {}", index, position);
return null;
}
- final var writeLimit = Math.min(avail, maxEntrySize);
- final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
- try {
- namespace.serialize(entry, diskEntry);
- } catch (KryoException e) {
- if (writeLimit != maxEntrySize) {
- // We have not provided enough capacity, signal to roll to next segment
- LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
- return null;
- }
-
- // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
- throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
- }
-
- final int length = diskEntry.position() - HEADER_BYTES;
+ // allocate buffer and write data
+ final var writeBuffer = startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
+ writeBuffer.put(buf.nioBuffer());
// Compute the checksum for the entry.
final var crc32 = new CRC32();
- crc32.update(diskEntry.flip().position(HEADER_BYTES));
+ crc32.update(writeBuffer.flip().position(HEADER_BYTES));
// Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
- commitWrite(position, diskEntry.rewind());
+ writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+ commitWrite(position, writeBuffer.rewind());
// Update the last entry with the correct index/term/length.
- final var indexedEntry = new Indexed<E>(index, entry, length);
- lastEntry = indexedEntry;
+ currentPosition = nextPosition;
+ lastWritten = buf;
+ lastIndex = index;
this.index.index(index, position);
- currentPosition = bodyPosition + length;
-
- @SuppressWarnings("unchecked")
- final var ugly = (Indexed<T>) indexedEntry;
- return ugly;
+ return index;
}
abstract ByteBuffer startWrite(int position, int size);
}
}
- abstract JournalSegmentReader<E> reader();
+ abstract JournalSegmentReader reader();
- private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
+ private void resetWithBuffer(final JournalSegmentReader reader, final long index) {
long nextIndex = segment.firstIndex();
// Clear the buffer indexes and acquire ownership of the buffer
reader.setPosition(JournalSegmentDescriptor.BYTES);
while (index == 0 || nextIndex <= index) {
- final var entry = reader.readEntry(nextIndex);
- if (entry == null) {
+ final var buf = reader.readBytes(nextIndex);
+ if (buf == null) {
break;
}
- lastEntry = entry;
+ lastWritten = buf;
+ lastIndex = nextIndex;
this.index.index(nextIndex, currentPosition);
nextIndex++;
// Update the current position for indexing.
- currentPosition = currentPosition + HEADER_BYTES + entry.size();
+ currentPosition += HEADER_BYTES + buf.readableBytes();
}
}
return;
}
- // Reset the last entry.
- lastEntry = null;
+ // Reset the last written
+ lastIndex = null;
+ lastWritten = null;
// Truncate the index.
this.index.truncate(index);
*/
abstract @Nullable MappedByteBuffer buffer();
- abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
+ abstract @NonNull MappedJournalSegmentWriter toMapped();
- abstract @NonNull DiskJournalSegmentWriter<E> toFileChannel();
+ abstract @NonNull DiskJournalSegmentWriter toFileChannel();
}