package io.atomix.storage.journal;
import com.esotericsoftware.kryo.KryoException;
+import com.google.common.annotations.VisibleForTesting;
import io.atomix.storage.journal.index.JournalIndex;
-
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
import java.util.zip.CRC32;
/**
currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
memory.position(memory.position() + length);
- // Read more bytes from the segment if necessary.
- if (memory.remaining() < maxEntrySize) {
- channel.read(memory.compact());
- memory.flip();
- }
-
- length = memory.getInt();
+ length = prepareNextEntry(channel, memory);
}
} catch (BufferUnderflowException e) {
// No-op, position is only updated on success
}
}
+ @VisibleForTesting
+ static int prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory) throws IOException {
+ int remaining = memory.remaining();
+ boolean compacted;
+ if (remaining < ENTRY_HEADER_BYTES) {
+ // We do not have the header available. Move the pointer and read.
+ channel.read(memory.compact());
+ remaining = memory.flip().remaining();
+ if (remaining < ENTRY_HEADER_BYTES) {
+ // could happen with mis-padded segment
+ return 0;
+ }
+ compacted = true;
+ } else {
+ compacted = false;
+ }
+
+ while (true) {
+ final int length = memory.mark().getInt();
+ if (remaining >= Integer.BYTES + length) {
+ // Fast path: we have the entry properly positioned
+ return length;
+ }
+
+ // Not enough data for entry, to header start
+ memory.reset();
+ if (compacted) {
+ // we have already compacted the buffer, there is just not enough data
+ return 0;
+ }
+
+ // Try to read more data and check again
+ channel.read(memory.compact());
+ remaining = memory.flip().remaining();
+ compacted = true;
+ }
+ }
+
@Override
Indexed<E> getLastEntry() {
return lastEntry;
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import static io.atomix.storage.journal.DiskJournalSegmentWriter.prepareNextEntry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.util.function.ToIntFunction;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DiskJournalSegmentWriterTest {
+ private static final int BUFFER_SIZE = 56;
+
+ @Mock
+ private SeekableByteChannel channel;
+
+ private final ByteBuffer buffer = ByteBuffer.wrap(new byte[BUFFER_SIZE]);
+
+ @Test
+ void testReadFastPath() throws Exception {
+ buffer.putInt(42).putInt(0).put(new byte[42]).flip();
+
+ assertEquals(42, prepareNextEntry(channel, buffer));
+ assertEquals(4, buffer.position());
+ assertEquals(46, buffer.remaining());
+ }
+
+ @Test
+ void testEmptyBufferEndOfFile() throws Exception {
+ buffer.position(BUFFER_SIZE);
+
+ prepareRead(buf -> {
+ assertEquals(0, buf.position());
+ return 0;
+ });
+
+ assertEquals(0, prepareNextEntry(channel, buffer));
+ assertEquals(0, buffer.remaining());
+ }
+
+ @Test
+ void testEmptyBuffer() throws Exception {
+ buffer.position(BUFFER_SIZE);
+
+ prepareRead(buf -> {
+ assertEquals(0, buf.position());
+ buf.putInt(20).putInt(0).put(new byte[20]);
+ return 28;
+ });
+
+ assertEquals(20, prepareNextEntry(channel, buffer));
+ assertEquals(4, buffer.position());
+ assertEquals(24, buffer.remaining());
+ }
+
+ @Test
+ void testEmptyBufferNotEnough() throws Exception {
+ buffer.position(BUFFER_SIZE);
+
+ prepareRead(buf -> {
+ assertEquals(0, buf.position());
+ buf.putInt(42).putInt(0).put(new byte[20]);
+ return 28;
+ });
+
+ assertEquals(0, prepareNextEntry(channel, buffer));
+ assertEquals(0, buffer.position());
+ assertEquals(28, buffer.remaining());
+ }
+
+ @Test
+ void testHeaderWithNotEnough() throws Exception {
+ buffer.putInt(42).putInt(0).put(new byte[20]).flip();
+
+ prepareRead(buf -> {
+ assertEquals(28, buf.position());
+ return 0;
+ });
+
+ assertEquals(0, prepareNextEntry(channel, buffer));
+ assertEquals(28, buffer.remaining());
+ assertEquals(0, buffer.position());
+ }
+
+ final void prepareRead(final ToIntFunction<ByteBuffer> onRead) throws Exception {
+ doAnswer(invocation -> onRead.applyAsInt(invocation.getArgument(0))).when(channel).read(any());
+ }
+}