*/
package io.atomix.storage.journal;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
import java.io.IOException;
-import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.zip.CRC32;
/**
* Log segment reader.
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class DiskJournalSegmentReader<E> extends JournalSegmentReader<E> {
- private final FileChannel channel;
- private final ByteBuffer memory;
- private long currentPosition;
-
- DiskJournalSegmentReader(
- FileChannel channel,
- JournalSegment<E> segment,
- int maxEntrySize,
- JournalSerdes namespace) {
- super(segment, maxEntrySize, namespace);
- this.channel = channel;
- this.memory = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2);
- }
+ private final FileChannel channel;
+ private final ByteBuffer buffer;
- @Override
- void setPosition(int position) {
- currentPosition = position;
- memory.clear().flip();
- }
+ // tracks where memory's first available byte maps to in terms of FileChannel.position()
+ private int bufferPosition;
- @Override
- Indexed<E> readEntry(final long index) {
- try {
- // Read more bytes from the segment if necessary.
- if (memory.remaining() < maxEntrySize) {
- long position = currentPosition + memory.position();
- channel.read(memory.clear(), position);
- currentPosition = position;
- memory.flip();
- }
+ DiskJournalSegmentReader(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+ final JournalSerdes namespace) {
+ super(segment, maxEntrySize, namespace);
+ this.channel = requireNonNull(channel);
+ buffer = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2).flip();
+ bufferPosition = 0;
+ }
- // Mark the buffer so it can be reset if necessary.
- memory.mark();
+ @Override void invalidateCache() {
+ buffer.clear().flip();
+ bufferPosition = 0;
+ }
- try {
- // Read the length of the entry.
- final int length = memory.getInt();
+ @Override ByteBuffer read(final int position, final int size) {
+ // calculate logical seek distance between buffer's first byte and position and split flow between
+ // forward-moving and backwards-moving code paths.
+ final int seek = bufferPosition - position;
+ return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size);
+ }
- // If the buffer length is zero then return.
- if (length <= 0 || length > maxEntrySize) {
- memory.reset().limit(memory.position());
- return null;
+ private ByteBuffer forwardAndRead(final int seek, final int position, final int size) {
+ final int missing = buffer.limit() - seek - size;
+ if (missing <= 0) {
+ // fast path: we have the requested region
+ return buffer.slice(seek, size).asReadOnlyBuffer();
}
- // Read the checksum of the entry.
- long checksum = memory.getInt() & 0xFFFFFFFFL;
+ // We need to read more data, but let's salvage what we can:
+ // - set buffer position to seek, which means it points to the same as position
+ // - run compact, which moves everything between position and limit onto the beginning of buffer and
+ // sets it up to receive more bytes
+ // - start the read accounting for the seek
+ buffer.position(seek).compact();
+ readAtLeast(position + seek, missing);
+ return setAndSlice(position, size);
+ }
- // Compute the checksum for the entry bytes.
- final CRC32 crc32 = new CRC32();
- crc32.update(memory.array(), memory.position(), length);
+ ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) {
+ // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and
+ // do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and
+ // read it.
+ buffer.clear();
+ readAtLeast(position, size);
+ return setAndSlice(position, size);
+ }
- // If the stored checksum equals the computed checksum, return the entry.
- if (checksum == crc32.getValue()) {
- int limit = memory.limit();
- memory.limit(memory.position() + length);
- E entry = namespace.deserialize(memory);
- memory.limit(limit);
- return new Indexed<>(index, entry, length);
- } else {
- memory.reset().limit(memory.position());
- return null;
+ void readAtLeast(final int readPosition, final int readAtLeast) {
+ final int bytesRead;
+ try {
+ bytesRead = channel.read(buffer, readPosition);
+ } catch (IOException e) {
+ throw new StorageException(e);
}
- } catch (BufferUnderflowException e) {
- memory.reset().limit(memory.position());
- return null;
- }
- } catch (IOException e) {
- throw new StorageException(e);
+ verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast);
+ buffer.flip();
+ }
+
+ private ByteBuffer setAndSlice(final int position, final int size) {
+ bufferPosition = position;
+ return buffer.slice(0, size).asReadOnlyBuffer();
}
- }
}
*/
package io.atomix.storage.journal;
+import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
+import com.esotericsoftware.kryo.KryoException;
+import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader, MappedJournalSegmentReader {
- final int maxEntrySize;
- final JournalSerdes namespace;
+ private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
+
private final JournalSegment<E> segment;
+ private final JournalSerdes namespace;
+ private final int maxSegmentSize;
+ private final int maxEntrySize;
+
+ private int position;
JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalSerdes namespace) {
this.segment = requireNonNull(segment);
+ maxSegmentSize = segment.descriptor().maxSegmentSize();
this.maxEntrySize = maxEntrySize;
this.namespace = requireNonNull(namespace);
}
/**
- * Close this reader.
+ * Return the current position.
+ *
+ * @return current position.
*/
- final void close() {
- segment.closeReader(this);
+ final int position() {
+ return position;
}
/**
*
* @param position new position
*/
- abstract void setPosition(int position);
+ final void setPosition(final int position) {
+ verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
+ "Invalid position %s", position);
+ this.position = position;
+ invalidateCache();
+ }
/**
- * Reads the entry at specified index.
+ * Invalidate any cache that is present, so that the next read is coherent with the backing file.
+ */
+ abstract void invalidateCache();
+
+ /**
+ * Reads the next entry, assigning it specified index.
*
* @param index entry index
* @return The entry, or {@code null}
*/
- abstract @Nullable Indexed<E> readEntry(long index);
+ final @Nullable Indexed<E> readEntry(final long index) {
+ // Check if there is enough in the buffer remaining
+ final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
+ if (remaining < 0) {
+ // Not enough space in the segment, there can never be another entry
+ return null;
+ }
+
+ // Calculate maximum entry length not exceeding file size nor maxEntrySize
+ final var maxLength = Math.min(remaining, maxEntrySize);
+ final var buffer = read(position, maxLength + SegmentEntry.HEADER_BYTES);
+
+ // Read the entry length
+ final var length = buffer.getInt(0);
+ if (length < 1 || length > maxLength) {
+ // Invalid length, make sure next read re-tries
+ invalidateCache();
+ return null;
+ }
+
+ // Read the entry checksum
+ final int checksum = buffer.getInt(Integer.BYTES);
+
+ // Slice off the entry's bytes
+ final var entryBytes = buffer.slice(SegmentEntry.HEADER_BYTES, length);
+ // Compute the checksum for the entry bytes.
+ final var crc32 = new CRC32();
+ crc32.update(entryBytes);
+
+ // If the stored checksum does not equal the computed checksum, do not proceed further
+ final var computed = (int) crc32.getValue();
+ if (checksum != computed) {
+ LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
+ invalidateCache();
+ return null;
+ }
+
+ // Attempt to deserialize
+ final E entry;
+ try {
+ entry = namespace.deserialize(entryBytes.rewind());
+ } catch (KryoException e) {
+ // TODO: promote this to a hard error, as it should never happen
+ LOG.debug("Failed to deserialize entry", e);
+ invalidateCache();
+ return null;
+ }
+
+ // We are all set. Update the position.
+ position = position + SegmentEntry.HEADER_BYTES + length;
+ return new Indexed<>(index, entry, length);
+ }
+
+ /**
+ * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed
+ * {@link #maxSegmentSize}.
+ *
+ * @param position position to the entry header
+ * @param size to read, guaranteed to not exceed {@link #maxEntrySize}
+ * @return resulting buffer
+ */
+ abstract ByteBuffer read(int position, int size);
+
+ /**
+ * Close this reader.
+ */
+ final void close() {
+ segment.closeReader(this);
+ }
}
*/
package io.atomix.storage.journal;
-import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
-import java.util.zip.CRC32;
/**
* Log segment reader.
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class MappedJournalSegmentReader<E> extends JournalSegmentReader<E> {
- private final ByteBuffer buffer;
+ private final ByteBuffer buffer;
- MappedJournalSegmentReader(
- ByteBuffer buffer,
- JournalSegment<E> segment,
- int maxEntrySize,
- JournalSerdes namespace) {
- super(segment, maxEntrySize, namespace);
- this.buffer = buffer.slice();
- }
-
- @Override
- void setPosition(int position) {
- buffer.position(position);
- }
-
- @Override
- Indexed<E> readEntry(final long index) {
- // Mark the buffer so it can be reset if necessary.
- buffer.mark();
-
- try {
- // Read the length of the entry.
- final int length = buffer.getInt();
-
- // If the buffer length is zero then return.
- if (length <= 0 || length > maxEntrySize) {
- buffer.reset();
- return null;
- }
-
- // Read the checksum of the entry.
- long checksum = buffer.getInt() & 0xFFFFFFFFL;
+ MappedJournalSegmentReader(final ByteBuffer buffer, final JournalSegment<E> segment, final int maxEntrySize,
+ final JournalSerdes namespace) {
+ super(segment, maxEntrySize, namespace);
+ this.buffer = buffer.slice().asReadOnlyBuffer();
+ }
- // Compute the checksum for the entry bytes.
- final CRC32 crc32 = new CRC32();
- ByteBuffer slice = buffer.slice();
- slice.limit(length);
- crc32.update(slice);
+ @Override
+ void invalidateCache() {
+ // No-op: the mapping is guaranteed to be coherent
+ }
- // If the stored checksum equals the computed checksum, return the entry.
- if (checksum == crc32.getValue()) {
- slice.rewind();
- E entry = namespace.deserialize(slice);
- buffer.position(buffer.position() + length);
- return new Indexed<>(index, entry, length);
- } else {
- buffer.reset();
- return null;
- }
- } catch (BufferUnderflowException e) {
- buffer.reset();
- return null;
+ @Override
+ ByteBuffer read(final int position, final int size) {
+ return buffer.slice(position, size);
}
- }
}