import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
-import com.esotericsoftware.kryo.KryoException;
-import io.atomix.storage.journal.StorageException.TooLarge;
import io.atomix.storage.journal.index.JournalIndex;
import java.io.IOException;
-import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.zip.CRC32;
/**
* Segment writer.
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
- private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
-
- private final JournalSegmentReader<E> reader;
- private final ByteBuffer buffer;
-
- DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalIndex index, final JournalSerdes namespace) {
- super(channel, segment, maxEntrySize, index, namespace);
-
- buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
- reader = new JournalSegmentReader<>(segment,
- new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
- reset(0);
- }
-
- DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
- super(previous);
-
- buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
- reader = new JournalSegmentReader<>(segment,
- new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
- }
-
- @Override
- MappedByteBuffer buffer() {
- return null;
- }
-
- @Override
- MappedJournalSegmentWriter<E> toMapped() {
- return new MappedJournalSegmentWriter<>(this);
- }
-
- @Override
- DiskJournalSegmentWriter<E> toFileChannel() {
- return this;
- }
-
- @Override
- JournalSegmentReader<E> reader() {
- return reader;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- <T extends E> Indexed<T> append(final T entry) {
- // Store the entry index.
- final long index = getNextIndex();
-
- // Serialize the entry.
- try {
- namespace.serialize(entry, buffer.clear().position(HEADER_BYTES));
- } catch (KryoException e) {
- throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
- buffer.flip();
-
- final int length = buffer.limit() - HEADER_BYTES;
- // Ensure there's enough space left in the buffer to store the entry.
- if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
- throw new BufferOverflowException();
- }
-
- // If the entry length exceeds the maximum entry size then throw an exception.
- if (length > maxEntrySize) {
- throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
-
- // Compute the checksum for the entry.
- final var crc32 = new CRC32();
- crc32.update(buffer.slice(HEADER_BYTES, length));
-
- // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
- try {
- channel.write(buffer, currentPosition);
- } catch (IOException e) {
- throw new StorageException(e);
- }
-
- // Update the last entry with the correct index/term/length.
- final var indexedEntry = new Indexed<E>(index, entry, length);
- lastEntry = indexedEntry;
- this.index.index(index, currentPosition);
-
- currentPosition = currentPosition + HEADER_BYTES + length;
- return (Indexed<T>) indexedEntry;
- }
-
- @Override
- void writeEmptyHeader(final int position) {
- try {
- channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
- } catch (IOException e) {
- throw new StorageException(e);
+ private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
+
+ private final JournalSegmentReader<E> reader;
+ private final ByteBuffer buffer;
+
+ DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+ final JournalIndex index, final JournalSerdes namespace) {
+ super(channel, segment, maxEntrySize, index, namespace);
+
+ buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+ reader = new JournalSegmentReader<>(segment,
+ new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+ reset(0);
+ }
+
+ DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+ super(previous);
+
+ buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+ reader = new JournalSegmentReader<>(segment,
+ new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+ }
+
+ @Override
+ MappedByteBuffer buffer() {
+ return null;
+ }
+
+ @Override
+ MappedJournalSegmentWriter<E> toMapped() {
+ return new MappedJournalSegmentWriter<>(this);
+ }
+
+ @Override
+ DiskJournalSegmentWriter<E> toFileChannel() {
+ return this;
+ }
+
+ @Override
+ JournalSegmentReader<E> reader() {
+ return reader;
}
- }
-
- @Override
- void flush() {
- try {
- if (channel.isOpen()) {
- channel.force(true);
- }
- } catch (IOException e) {
- throw new StorageException(e);
+
+ @Override
+ ByteBuffer startWrite(final int position, final int size) {
+ return buffer.clear().slice(0, size);
+ }
+
+ @Override
+ void commitWrite(final int position, final ByteBuffer entry) {
+ try {
+ channel.write(entry, position);
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
}
- }
- @Override
- void close() {
- flush();
- }
+ @Override
+ void writeEmptyHeader(final int position) {
+ try {
+ channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ @Override
+ void flush() {
+ try {
+ if (channel.isOpen()) {
+ channel.force(true);
+ }
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ @Override
+ void close() {
+ flush();
+ }
}
import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
+import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
final @NonNull FileChannel channel;
final @NonNull JournalSegment<E> segment;
- final @NonNull JournalIndex index;
+ private final @NonNull JournalIndex index;
final @NonNull JournalSerdes namespace;
final int maxSegmentSize;
final int maxEntrySize;
- // FIXME: hide these two fields
- Indexed<E> lastEntry;
- int currentPosition;
+ private Indexed<E> lastEntry;
+ private int currentPosition;
JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
final JournalIndex index, final JournalSerdes namespace) {
* @param entry The entry to append.
* @return The appended indexed entry.
*/
- abstract <T extends E> Indexed<T> append(T entry);
+ final <T extends E> Indexed<T> append(final T entry) {
+ // Store the entry index.
+ final long index = getNextIndex();
+ final int position = currentPosition;
+
+ // Serialize the entry.
+ final int bodyPosition = position + HEADER_BYTES;
+ final int avail = maxSegmentSize - bodyPosition;
+ if (avail < 0) {
+ throw new BufferOverflowException();
+ }
+
+ final var writeLimit = Math.min(avail, maxEntrySize);
+ final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
+ try {
+ namespace.serialize(entry, diskEntry);
+ } catch (KryoException e) {
+ if (writeLimit != maxEntrySize) {
+ // We have not provided enough capacity, signal to roll to next segment
+ throw new BufferOverflowException();
+ }
+
+ // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
+ throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+ }
+
+ final int length = diskEntry.position() - HEADER_BYTES;
+
+ // Compute the checksum for the entry.
+ final var crc32 = new CRC32();
+ crc32.update(diskEntry.flip().position(HEADER_BYTES));
+
+ // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
+ diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+ commitWrite(position, diskEntry.rewind());
+
+ // Update the last entry with the correct index/term/length.
+ final var indexedEntry = new Indexed<E>(index, entry, length);
+ lastEntry = indexedEntry;
+ this.index.index(index, position);
+
+ currentPosition = bodyPosition + length;
+
+ @SuppressWarnings("unchecked")
+ final var ugly = (Indexed<T>) indexedEntry;
+ return ugly;
+ }
+
+ abstract ByteBuffer startWrite(int position, int size);
+
+ abstract void commitWrite(int position, ByteBuffer entry);
/**
* Resets the head of the segment to the given index.
final void truncate(final long index) {
// If the index is greater than or equal to the last index, skip the truncate.
if (index >= getLastIndex()) {
- return;
+ return;
}
// Reset the last entry.
this.index.truncate(index);
if (index < segment.firstIndex()) {
- // Reset the writer to the first entry.
- currentPosition = JournalSegmentDescriptor.BYTES;
+ // Reset the writer to the first entry.
+ currentPosition = JournalSegmentDescriptor.BYTES;
} else {
- // Reset the writer to the given index.
- reset(index);
+ // Reset the writer to the given index.
+ reset(index);
}
// Zero the entry header at current channel position.
*/
package io.atomix.storage.journal;
-import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
-
-import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
import java.io.IOException;
-import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.NonNull;
/**
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
- private final @NonNull MappedByteBuffer mappedBuffer;
- private final JournalSegmentReader<E> reader;
- private final ByteBuffer buffer;
-
- MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalIndex index, final JournalSerdes namespace) {
- super(channel, segment, maxEntrySize, index, namespace);
-
- mappedBuffer = mapBuffer(channel, maxSegmentSize);
- buffer = mappedBuffer.slice();
- reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
- maxEntrySize, namespace);
- reset(0);
- }
-
- MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
- super(previous);
-
- mappedBuffer = mapBuffer(channel, maxSegmentSize);
- buffer = mappedBuffer.slice();
- reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
- maxEntrySize, namespace);
- }
-
- private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
- try {
- return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize);
- } catch (IOException e) {
- throw new StorageException(e);
+ private final @NonNull MappedByteBuffer mappedBuffer;
+ private final JournalSegmentReader<E> reader;
+ private final ByteBuffer buffer;
+
+ MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+ final JournalIndex index, final JournalSerdes namespace) {
+ super(channel, segment, maxEntrySize, index, namespace);
+
+ mappedBuffer = mapBuffer(channel, maxSegmentSize);
+ buffer = mappedBuffer.slice();
+ reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+ maxEntrySize, namespace);
+ reset(0);
+ }
+
+ MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+ super(previous);
+
+ mappedBuffer = mapBuffer(channel, maxSegmentSize);
+ buffer = mappedBuffer.slice();
+ reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+ maxEntrySize, namespace);
+ }
+
+ private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
+ try {
+ return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize);
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ @Override
+ @NonNull MappedByteBuffer buffer() {
+ return mappedBuffer;
+ }
+
+ @Override
+ MappedJournalSegmentWriter<E> toMapped() {
+ return this;
}
- }
-
- @Override
- @NonNull MappedByteBuffer buffer() {
- return mappedBuffer;
- }
-
- @Override
- MappedJournalSegmentWriter<E> toMapped() {
- return this;
- }
-
- @Override
- DiskJournalSegmentWriter<E> toFileChannel() {
- close();
- return new DiskJournalSegmentWriter<>(this);
- }
-
- @Override
- JournalSegmentReader<E> reader() {
- return reader;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- <T extends E> Indexed<T> append(final T entry) {
- // Store the entry index.
- final long index = getNextIndex();
-
- // Serialize the entry.
- final int bodyPosition = currentPosition + HEADER_BYTES;
- final int avail = maxSegmentSize - bodyPosition;
- if (avail < 0) {
- throw new BufferOverflowException();
+
+ @Override
+ DiskJournalSegmentWriter<E> toFileChannel() {
+ close();
+ return new DiskJournalSegmentWriter<>(this);
+ }
+
+ @Override
+ JournalSegmentReader<E> reader() {
+ return reader;
+ }
+
+ @Override
+ ByteBuffer startWrite(final int position, final int size) {
+ return buffer.slice(position, size);
+ }
+
+ @Override
+ void commitWrite(final int position, final ByteBuffer entry) {
+ // No-op, buffer is write-through
+ }
+
+ @Override
+ void writeEmptyHeader(final int position) {
+ // Note: we issue a single putLong() instead of two putInt()s.
+ buffer.putLong(position, 0L);
}
- final var entryBytes = buffer.slice(bodyPosition, Math.min(avail, maxEntrySize));
- try {
- namespace.serialize(entry, entryBytes);
- } catch (KryoException e) {
- if (entryBytes.capacity() != maxEntrySize) {
- // We have not provided enough capacity, signal to roll to next segment
- throw new BufferOverflowException();
- }
-
- // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
- throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+ @Override
+ void flush() {
+ mappedBuffer.force();
}
- final int length = entryBytes.position();
-
- // Compute the checksum for the entry.
- final var crc32 = new CRC32();
- crc32.update(entryBytes.flip());
-
- // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- buffer.putInt(currentPosition, length).putInt(currentPosition + Integer.BYTES, (int) crc32.getValue());
-
- // Update the last entry with the correct index/term/length.
- Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
- lastEntry = indexedEntry;
- this.index.index(index, currentPosition);
-
- currentPosition = currentPosition + HEADER_BYTES + length;
- return (Indexed<T>) indexedEntry;
- }
-
- @Override
- void writeEmptyHeader(final int position) {
- // Note: we issue a single putLong() instead of two putInt()s.
- buffer.putLong(position, 0L);
- }
-
- @Override
- void flush() {
- mappedBuffer.force();
- }
-
- @Override
- void close() {
- flush();
- try {
- BufferCleaner.freeBuffer(mappedBuffer);
- } catch (IOException e) {
- throw new StorageException(e);
+ @Override
+ void close() {
+ flush();
+ try {
+ BufferCleaner.freeBuffer(mappedBuffer);
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
}
- }
}