import io.atomix.storage.journal.index.JournalIndex;
import java.io.IOException;
import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.zip.CRC32;
+import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Segment writer.
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
+ private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[ENTRY_HEADER_BYTES]);
private final ByteBuffer memory;
}
@Override
- void reset(long index) {
- long nextIndex = firstIndex;
-
- // Clear the buffer indexes.
- currentPosition = JournalSegmentDescriptor.BYTES;
-
- try {
- // Clear memory buffer and read fist chunk
- channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
- memory.flip();
-
- // Read the entry length.
- int length = memory.getInt();
-
- // If the length is non-zero, read the entry.
- while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
-
- // Read the checksum of the entry.
- final long checksum = memory.getInt() & 0xFFFFFFFFL;
-
- // Slice off the entry's bytes
- final ByteBuffer entryBytes = memory.slice();
- entryBytes.limit(length);
-
- // Compute the checksum for the entry bytes.
- final CRC32 crc32 = new CRC32();
- crc32.update(entryBytes);
-
- // If the stored checksum does not equal the computed checksum, do not proceed further
- if (checksum != crc32.getValue()) {
- break;
- }
-
- entryBytes.rewind();
- final E entry = namespace.deserialize(entryBytes);
- lastEntry = new Indexed<>(nextIndex, entry, length);
- this.index.index(nextIndex, (int) currentPosition);
- nextIndex++;
-
- // Update the current position for indexing.
- currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
- memory.position(memory.position() + length);
-
- length = prepareNextEntry(channel, memory);
+ void reset(final long index) {
+ long nextIndex = firstIndex;
+
+ // Clear the buffer indexes.
+ currentPosition = JournalSegmentDescriptor.BYTES;
+
+ try {
+ // Clear memory buffer and read fist chunk
+ channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
+ memory.flip();
+
+ while (index == 0 || nextIndex <= index) {
+ final var entry = prepareNextEntry(channel, memory, maxEntrySize);
+ if (entry == null) {
+ break;
+ }
+
+ final var bytes = entry.bytes();
+ final var length = bytes.remaining();
+ try {
+ lastEntry = new Indexed<>(nextIndex, namespace.<E>deserialize(bytes), length);
+ } catch (KryoException e) {
+ // No-op, position is only updated on success
+ LOG.debug("Failed to deserialize entry", e);
+ break;
+ }
+
+ this.index.index(nextIndex, (int) currentPosition);
+ nextIndex++;
+
+ // Update the current position for indexing.
+ currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
+ memory.position(memory.position() + length);
+ }
+ } catch (IOException e) {
+ throw new StorageException(e);
}
- } catch (BufferUnderflowException e) {
- // No-op, position is only updated on success
- } catch (IOException e) {
- throw new StorageException(e);
- }
}
@VisibleForTesting
- static int prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory) throws IOException {
+ static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory,
+ final int maxEntrySize) throws IOException {
int remaining = memory.remaining();
boolean compacted;
if (remaining < ENTRY_HEADER_BYTES) {
remaining = memory.flip().remaining();
if (remaining < ENTRY_HEADER_BYTES) {
// could happen with mis-padded segment
- return 0;
+ return null;
}
compacted = true;
} else {
compacted = false;
}
+ int length;
while (true) {
- final int length = memory.mark().getInt();
+ length = memory.mark().getInt();
+ if (length < 1 || length > maxEntrySize) {
+ // Invalid length,
+ memory.reset();
+ return null;
+ }
+
if (remaining >= Integer.BYTES + length) {
// Fast path: we have the entry properly positioned
- return length;
+ break;
}
// Not enough data for entry, to header start
memory.reset();
if (compacted) {
// we have already compacted the buffer, there is just not enough data
- return 0;
+ return null;
}
// Try to read more data and check again
remaining = memory.flip().remaining();
compacted = true;
}
+
+ // Read the checksum of the entry.
+ final int checksum = memory.getInt();
+
+ // Slice off the entry's bytes
+ final var entryBytes = memory.slice();
+ entryBytes.limit(length);
+
+ // Compute the checksum for the entry bytes.
+ final var crc32 = new CRC32();
+ crc32.update(entryBytes);
+
+ // If the stored checksum does not equal the computed checksum, do not proceed further
+ final var computed = (int) crc32.getValue();
+ if (checksum != computed) {
+ LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
+ memory.reset();
+ return null;
+ }
+
+ return new SegmentEntry(checksum, entryBytes.rewind());
}
@Override
*/
package io.atomix.storage.journal;
-import static io.atomix.storage.journal.DiskJournalSegmentWriter.prepareNextEntry;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.function.ToIntFunction;
+import org.eclipse.jdt.annotation.Nullable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@ExtendWith(MockitoExtension.class)
class DiskJournalSegmentWriterTest {
private static final int BUFFER_SIZE = 56;
+ private static final int MAX_ENTRY_SIZE = 42;
@Mock
private SeekableByteChannel channel;
@Test
void testReadFastPath() throws Exception {
- buffer.putInt(42).putInt(0).put(new byte[42]).flip();
+ buffer.putInt(42).putInt(0xE46F28FB).put(new byte[42]).flip();
- assertEquals(42, prepareNextEntry(channel, buffer));
- assertEquals(4, buffer.position());
- assertEquals(46, buffer.remaining());
+ final var entry = prepareNextEntry(channel, buffer);
+ assertNotNull(entry);
+ assertEquals(42, entry.bytes().remaining());
+ assertEquals(8, buffer.position());
+ assertEquals(42, buffer.remaining());
}
@Test
return 0;
});
- assertEquals(0, prepareNextEntry(channel, buffer));
+ assertNull(prepareNextEntry(channel, buffer));
assertEquals(0, buffer.remaining());
}
prepareRead(buf -> {
assertEquals(0, buf.position());
- buf.putInt(20).putInt(0).put(new byte[20]);
+ buf.putInt(20).putInt(0x0FD59B8D).put(new byte[20]);
return 28;
});
- assertEquals(20, prepareNextEntry(channel, buffer));
- assertEquals(4, buffer.position());
- assertEquals(24, buffer.remaining());
+ final var entry = prepareNextEntry(channel, buffer);
+ assertNotNull(entry);
+ assertEquals(20, entry.bytes().remaining());
+ assertEquals(8, buffer.position());
+ assertEquals(20, buffer.remaining());
}
@Test
return 28;
});
- assertEquals(0, prepareNextEntry(channel, buffer));
+ assertNull(prepareNextEntry(channel, buffer));
assertEquals(0, buffer.position());
assertEquals(28, buffer.remaining());
}
return 0;
});
- assertEquals(0, prepareNextEntry(channel, buffer));
+ assertNull(prepareNextEntry(channel, buffer));
assertEquals(28, buffer.remaining());
assertEquals(0, buffer.position());
}
final void prepareRead(final ToIntFunction<ByteBuffer> onRead) throws Exception {
doAnswer(invocation -> onRead.applyAsInt(invocation.getArgument(0))).when(channel).read(any());
}
+
+ private static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory)
+ throws IOException {
+ return DiskJournalSegmentWriter.prepareNextEntry(channel, memory, MAX_ENTRY_SIZE);
+ }
}