package io.atomix.storage.journal;
import com.esotericsoftware.kryo.KryoException;
+import com.google.common.annotations.VisibleForTesting;
import io.atomix.storage.journal.index.JournalIndex;
-
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
import java.util.zip.CRC32;
/**
currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
memory.position(memory.position() + length);
- // Read more bytes from the segment if necessary.
- if (memory.remaining() < maxEntrySize) {
- channel.read(memory.compact());
- memory.flip();
- }
-
- length = memory.getInt();
+ length = prepareNextEntry(channel, memory);
}
} catch (BufferUnderflowException e) {
// No-op, position is only updated on success
}
}
+ @VisibleForTesting
+ static int prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory) throws IOException {
+ int remaining = memory.remaining();
+ boolean compacted;
+ if (remaining < ENTRY_HEADER_BYTES) {
+ // We do not have the header available. Move the pointer and read.
+ channel.read(memory.compact());
+ remaining = memory.flip().remaining();
+ if (remaining < ENTRY_HEADER_BYTES) {
+ // could happen with mis-padded segment
+ return 0;
+ }
+ compacted = true;
+ } else {
+ compacted = false;
+ }
+
+ while (true) {
+ final int length = memory.mark().getInt();
+ if (remaining >= Integer.BYTES + length) {
+ // Fast path: we have the entry properly positioned
+ return length;
+ }
+
+ // Not enough data for entry, to header start
+ memory.reset();
+ if (compacted) {
+ // we have already compacted the buffer, there is just not enough data
+ return 0;
+ }
+
+ // Try to read more data and check again
+ channel.read(memory.compact());
+ remaining = memory.flip().remaining();
+ compacted = true;
+ }
+ }
+
@Override
Indexed<E> getLastEntry() {
return lastEntry;