- private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
-
- private final JournalSegmentReader<E> reader;
- private final ByteBuffer buffer;
-
- DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalIndex index, final JournalSerdes namespace) {
- super(channel, segment, maxEntrySize, index, namespace);
-
- buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
- reader = new JournalSegmentReader<>(segment,
- new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
- reset(0);
- }
-
- DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
- super(previous);
-
- buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
- reader = new JournalSegmentReader<>(segment,
- new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
- }
-
- @Override
- MappedByteBuffer buffer() {
- return null;
- }
-
- @Override
- MappedJournalSegmentWriter<E> toMapped() {
- return new MappedJournalSegmentWriter<>(this);
- }
-
- @Override
- DiskJournalSegmentWriter<E> toFileChannel() {
- return this;
- }
-
- @Override
- JournalSegmentReader<E> reader() {
- return reader;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- <T extends E> Indexed<T> append(final T entry) {
- // Store the entry index.
- final long index = getNextIndex();
-
- // Serialize the entry.
- try {
- namespace.serialize(entry, buffer.clear().position(HEADER_BYTES));
- } catch (KryoException e) {
- throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
- buffer.flip();
-
- final int length = buffer.limit() - HEADER_BYTES;
- // Ensure there's enough space left in the buffer to store the entry.
- if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
- throw new BufferOverflowException();
- }
-
- // If the entry length exceeds the maximum entry size then throw an exception.
- if (length > maxEntrySize) {
- throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
-
- // Compute the checksum for the entry.
- final var crc32 = new CRC32();
- crc32.update(buffer.slice(HEADER_BYTES, length));
-
- // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
- try {
- channel.write(buffer, currentPosition);
- } catch (IOException e) {
- throw new StorageException(e);
- }
-
- // Update the last entry with the correct index/term/length.
- final var indexedEntry = new Indexed<E>(index, entry, length);
- lastEntry = indexedEntry;
- this.index.index(index, currentPosition);
-
- currentPosition = currentPosition + HEADER_BYTES + length;
- return (Indexed<T>) indexedEntry;
- }
-
- @Override
- void writeEmptyHeader(final int position) {
- try {
- channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
- } catch (IOException e) {
- throw new StorageException(e);
+ private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
+
+ private final JournalSegmentReader<E> reader;
+ private final ByteBuffer buffer;
+
+ DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+ final JournalIndex index, final JournalSerdes namespace) {
+ super(channel, segment, maxEntrySize, index, namespace);
+
+ buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+ reader = new JournalSegmentReader<>(segment,
+ new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+ reset(0);
+ }
+
+ DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+ super(previous);
+
+ buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+ reader = new JournalSegmentReader<>(segment,
+ new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+ }
+
+ @Override
+ MappedByteBuffer buffer() {
+ return null;
+ }
+
+ @Override
+ MappedJournalSegmentWriter<E> toMapped() {
+ return new MappedJournalSegmentWriter<>(this);
+ }
+
+ @Override
+ DiskJournalSegmentWriter<E> toFileChannel() {
+ return this;
+ }
+
+ @Override
+ JournalSegmentReader<E> reader() {
+ return reader;