import static;
import com.esotericsoftware.kryo.KryoException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import org.eclipse.jdt.annotation.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
* Segment writer.
* @author <a href="">Jordan Halterman</a>
final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
- private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
- private final ByteBuffer memory;
+ private final JournalSegmentReader<E> reader;
+ private final ByteBuffer buffer;
private Indexed<E> lastEntry;
private long currentPosition;
- DiskJournalSegmentWriter(
- FileChannel channel,
- JournalSegment<E> segment,
- int maxEntrySize,
- JournalIndex index,
- JournalSerdes namespace) {
+ DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+ final JournalIndex index, final JournalSerdes namespace) {
super(channel, segment, maxEntrySize, index, namespace);
- memory = allocMemory(maxEntrySize);
+ buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+ final var fileReader = new DiskFileReader(segment.file().file().toPath(), channel, maxSegmentSize, maxEntrySize);
+ reader = new JournalSegmentReader<>(segment, fileReader, maxEntrySize, namespace);
- DiskJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
+ DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous, final int position) {
- memory = allocMemory(maxEntrySize);
+ buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+ final var fileReader = new DiskFileReader(segment.file().file().toPath(), channel, maxSegmentSize, maxEntrySize);
+ reader = new JournalSegmentReader<>(segment, fileReader, maxEntrySize, namespace);
lastEntry = previous.getLastEntry();
currentPosition = position;
- private static ByteBuffer allocMemory(int maxEntrySize) {
- final var buf = ByteBuffer.allocate((maxEntrySize + HEADER_BYTES) * 2);
- buf.limit(0);
- return buf;
- }
MappedByteBuffer buffer() {
return null;
void reset(final long index) {
- long nextIndex = firstIndex;
- // Clear the buffer indexes.
- currentPosition = JournalSegmentDescriptor.BYTES;
+ // acquire ownership of cache and make sure reader does not see anything we've done once we're done
+ reader.invalidateCache();
try {
- // Clear memory buffer and read fist chunk
-, JournalSegmentDescriptor.BYTES);
- memory.flip();
- while (index == 0 || nextIndex <= index) {
- final var entry = prepareNextEntry(channel, memory, maxEntrySize);
- if (entry == null) {
- break;
- }
- final var bytes = entry.bytes();
- final var length = bytes.remaining();
- try {
- lastEntry = new Indexed<>(nextIndex, namespace.<E>deserialize(bytes), length);
- } catch (KryoException e) {
- // No-op, position is only updated on success
- LOG.debug("Failed to deserialize entry", e);
- break;
- }
- this.index.index(nextIndex, (int) currentPosition);
- nextIndex++;
- // Update the current position for indexing.
- currentPosition = currentPosition + HEADER_BYTES + length;
- memory.position(memory.position() + length);
- }
- } catch (IOException e) {
- throw new StorageException(e);
+ resetWithBuffer(index);
+ } finally {
+ // Make sure reader does not see anything we've done
+ reader.invalidateCache();
- @VisibleForTesting
- static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory,
- final int maxEntrySize) throws IOException {
- int remaining = memory.remaining();
- boolean compacted;
- if (remaining < HEADER_BYTES) {
- // We do not have the header available. Move the pointer and read.
- remaining = memory.flip().remaining();
- if (remaining < HEADER_BYTES) {
- // could happen with mis-padded segment
- return null;
- }
- compacted = true;
- } else {
- compacted = false;
- }
+ private void resetWithBuffer(final long index) {
+ long nextIndex = firstIndex;
- int length;
- while (true) {
- length = memory.mark().getInt();
- if (length < 1 || length > maxEntrySize) {
- // Invalid length,
- memory.reset();
- return null;
- }
+ // Clear the buffer indexes and acquire ownership of the buffer
+ currentPosition = JournalSegmentDescriptor.BYTES;
+ reader.setPosition(JournalSegmentDescriptor.BYTES);
- if (remaining >= Integer.BYTES + length) {
- // Fast path: we have the entry properly positioned
+ while (index == 0 || nextIndex <= index) {
+ final var entry = reader.readEntry(nextIndex);
+ if (entry == null) {
- // Not enough data for entry, to header start
- memory.reset();
- if (compacted) {
- // we have already compacted the buffer, there is just not enough data
- return null;
- }
+ lastEntry = entry;
+ this.index.index(nextIndex, (int) currentPosition);
+ nextIndex++;
- // Try to read more data and check again
- remaining = memory.flip().remaining();
- compacted = true;
+ // Update the current position for indexing.
+ currentPosition = currentPosition + HEADER_BYTES + entry.size();
- // Read the checksum of the entry.
- final int checksum = memory.getInt();
- // Slice off the entry's bytes
- final var entryBytes = memory.slice();
- entryBytes.limit(length);
- // Compute the checksum for the entry bytes.
- final var crc32 = new CRC32();
- crc32.update(entryBytes);
- // If the stored checksum does not equal the computed checksum, do not proceed further
- final var computed = (int) crc32.getValue();
- if (checksum != computed) {
- LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
- memory.reset();
- return null;
- }
- return new SegmentEntry(checksum, entryBytes.rewind());
- <T extends E> Indexed<T> append(T entry) {
- // Store the entry index.
- final long index = getNextIndex();
- // Serialize the entry.
- try {
- namespace.serialize(entry, memory.clear().position(HEADER_BYTES));
- } catch (KryoException e) {
- throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
- memory.flip();
+ <T extends E> Indexed<T> append(final T entry) {
+ // Store the entry index.
+ final long index = getNextIndex();
- final int length = memory.limit() - HEADER_BYTES;
+ // Serialize the entry.
+ try {
+ namespace.serialize(entry, buffer.clear().position(HEADER_BYTES));
+ } catch (KryoException e) {
+ throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+ }
+ buffer.flip();
- // Ensure there's enough space left in the buffer to store the entry.
- if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
- throw new BufferOverflowException();
- }
+ final int length = buffer.limit() - HEADER_BYTES;
+ // Ensure there's enough space left in the buffer to store the entry.
+ if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
+ throw new BufferOverflowException();
+ }
- // If the entry length exceeds the maximum entry size then throw an exception.
- if (length > maxEntrySize) {
- throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
+ // If the entry length exceeds the maximum entry size then throw an exception.
+ if (length > maxEntrySize) {
+ throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
+ }
- // Compute the checksum for the entry.
- final CRC32 crc32 = new CRC32();
- crc32.update(memory.array(), HEADER_BYTES, memory.limit() - HEADER_BYTES);
- final long checksum = crc32.getValue();
+ // Compute the checksum for the entry.
+ final var crc32 = new CRC32();
+ crc32.update(buffer.slice(HEADER_BYTES, length));
- // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- memory.putInt(0, length).putInt(Integer.BYTES, (int) checksum);
- try {
- channel.write(memory, currentPosition);
- } catch (IOException e) {
- throw new StorageException(e);
- }
+ // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
+ buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+ try {
+ channel.write(buffer, currentPosition);
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
- // Update the last entry with the correct index/term/length.
- Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
- this.lastEntry = indexedEntry;
- this.index.index(index, (int) currentPosition);
+ // Update the last entry with the correct index/term/length.
+ final var indexedEntry = new Indexed<E>(index, entry, length);
+ lastEntry = indexedEntry;
+ this.index.index(index, (int) currentPosition);
- currentPosition = currentPosition + HEADER_BYTES + length;
- return (Indexed<T>) indexedEntry;
+ currentPosition = currentPosition + HEADER_BYTES + length;
+ return (Indexed<T>) indexedEntry;
- void truncate(long index) {
+ void truncate(final long index) {
// If the index is greater than or equal to the last index, skip the truncate.
if (index >= getLastIndex()) {
+++ /dev/null
- * Copyright (c) 2024, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at
- */
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import java.nio.ByteBuffer;
-import java.nio.channels.SeekableByteChannel;
-import java.util.function.ToIntFunction;
-import org.eclipse.jdt.annotation.Nullable;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-class DiskJournalSegmentWriterTest {
- private static final int BUFFER_SIZE = 56;
- private static final int MAX_ENTRY_SIZE = 42;
- @Mock
- private SeekableByteChannel channel;
- private final ByteBuffer buffer = ByteBuffer.wrap(new byte[BUFFER_SIZE]);
- @Test
- void testReadFastPath() throws Exception {
- buffer.putInt(42).putInt(0xE46F28FB).put(new byte[42]).flip();
- final var entry = prepareNextEntry(channel, buffer);
- assertNotNull(entry);
- assertEquals(42, entry.bytes().remaining());
- assertEquals(8, buffer.position());
- assertEquals(42, buffer.remaining());
- }
- @Test
- void testEmptyBufferEndOfFile() throws Exception {
- buffer.position(BUFFER_SIZE);
- prepareRead(buf -> {
- assertEquals(0, buf.position());
- return 0;
- });
- assertNull(prepareNextEntry(channel, buffer));
- assertEquals(0, buffer.remaining());
- }
- @Test
- void testEmptyBuffer() throws Exception {
- buffer.position(BUFFER_SIZE);
- prepareRead(buf -> {
- assertEquals(0, buf.position());
- buf.putInt(20).putInt(0x0FD59B8D).put(new byte[20]);
- return 28;
- });
- final var entry = prepareNextEntry(channel, buffer);
- assertNotNull(entry);
- assertEquals(20, entry.bytes().remaining());
- assertEquals(8, buffer.position());
- assertEquals(20, buffer.remaining());
- }
- @Test
- void testEmptyBufferNotEnough() throws Exception {
- buffer.position(BUFFER_SIZE);
- prepareRead(buf -> {
- assertEquals(0, buf.position());
- buf.putInt(42).putInt(0).put(new byte[20]);
- return 28;
- });
- assertNull(prepareNextEntry(channel, buffer));
- assertEquals(0, buffer.position());
- assertEquals(28, buffer.remaining());
- }
- @Test
- void testHeaderWithNotEnough() throws Exception {
- buffer.putInt(42).putInt(0).put(new byte[20]).flip();
- prepareRead(buf -> {
- assertEquals(28, buf.position());
- return 0;
- });
- assertNull(prepareNextEntry(channel, buffer));
- assertEquals(28, buffer.remaining());
- assertEquals(0, buffer.position());
- }
- final void prepareRead(final ToIntFunction<ByteBuffer> onRead) throws Exception {
- doAnswer(invocation -> onRead.applyAsInt(invocation.getArgument(0))).when(channel).read(any());
- }
- private static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory)
- throws IOException {
- return DiskJournalSegmentWriter.prepareNextEntry(channel, memory, MAX_ENTRY_SIZE);
- }