X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FMappedJournalSegmentWriter.java;h=a9fb5b408834119333cd0644821223db0bbf9ee1;hb=c0fc330d05dc2a3257ea358e763f946c77f1a1fa;hp=99180c5840321165696796a45234f5dcfcbb237e;hpb=479ecb0109aaa1c8682dd083600b777cbd68dc07;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java index 99180c5840..a9fb5b4088 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java @@ -22,7 +22,6 @@ import com.esotericsoftware.kryo.KryoException; import io.atomix.storage.journal.index.JournalIndex; import java.io.IOException; import java.nio.BufferOverflowException; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; @@ -49,27 +48,29 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { private final ByteBuffer buffer; private Indexed lastEntry; + private int currentPosition; MappedJournalSegmentWriter( - FileChannel channel, - JournalSegment segment, - int maxEntrySize, - JournalIndex index, - JournalSerdes namespace) { + final FileChannel channel, + final JournalSegment segment, + final int maxEntrySize, + final JournalIndex index, + final JournalSerdes namespace) { super(channel, segment, maxEntrySize, index, namespace); mappedBuffer = mapBuffer(channel, maxSegmentSize); buffer = mappedBuffer.slice(); reset(0); } - MappedJournalSegmentWriter(JournalSegmentWriter previous, int position) { + MappedJournalSegmentWriter(final JournalSegmentWriter previous, final int position) { super(previous); mappedBuffer = mapBuffer(channel, maxSegmentSize); - buffer = mappedBuffer.slice().position(position); + buffer = mappedBuffer.slice(); + currentPosition = position; lastEntry = previous.getLastEntry(); } - private static @NonNull MappedByteBuffer mapBuffer(FileChannel channel, int maxSegmentSize) { + private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) { try { return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize); } catch (IOException e) { @@ -89,63 +90,50 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { @Override DiskJournalSegmentWriter toFileChannel() { - final int position = buffer.position(); close(); - return new DiskJournalSegmentWriter<>(this, position); + return new DiskJournalSegmentWriter<>(this, currentPosition); } @Override - void reset(long index) { + void reset(final long index) { long nextIndex = firstIndex; // Clear the buffer indexes. - buffer.position(JournalSegmentDescriptor.BYTES); - - // Record the current buffer position. - int position = buffer.position(); + currentPosition = JournalSegmentDescriptor.BYTES; - // Read the entry length. - buffer.mark(); - - try { - int length = buffer.getInt(); + int length = buffer.getInt(currentPosition); - // If the length is non-zero, read the entry. - while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) { + // If the length is non-zero, read the entry. + while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) { - // Read the checksum of the entry. - final long checksum = buffer.getInt() & 0xFFFFFFFFL; + // Read the checksum of the entry. + final long checksum = buffer.getInt(currentPosition + Integer.BYTES); - // Slice off the entry's bytes - final ByteBuffer entryBytes = buffer.slice(); - entryBytes.limit(length); + // Slice off the entry's bytes + final var entryBytes = buffer.slice(currentPosition + SegmentEntry.HEADER_BYTES, length); - // Compute the checksum for the entry bytes. - final CRC32 crc32 = new CRC32(); - crc32.update(entryBytes); + // Compute the checksum for the entry bytes. + final var crc32 = new CRC32(); + crc32.update(entryBytes); - // If the stored checksum does not equal the computed checksum, do not proceed further - if (checksum != crc32.getValue()) { + // If the stored checksum does not equal the computed checksum, do not proceed further + if (checksum != (int) crc32.getValue()) { break; - } + } - entryBytes.rewind(); - final E entry = namespace.deserialize(entryBytes); - lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, position); - nextIndex++; + entryBytes.rewind(); + final E entry = namespace.deserialize(entryBytes); + lastEntry = new Indexed<>(nextIndex, entry, length); + this.index.index(nextIndex, currentPosition); + nextIndex++; - // Update the current position for indexing. - position = buffer.position() + length; - buffer.position(position); + // Update the current position for indexing. + currentPosition = currentPosition + SegmentEntry.HEADER_BYTES + length; - length = buffer.mark().getInt(); + if (currentPosition + SegmentEntry.HEADER_BYTES >= maxSegmentSize) { + break; } - - // Reset the buffer to the previous mark. - buffer.reset(); - } catch (BufferUnderflowException e) { - buffer.reset(); + length = buffer.getInt(currentPosition); } } @@ -156,53 +144,50 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { @Override @SuppressWarnings("unchecked") - Indexed append(T entry) { + Indexed append(final T entry) { // Store the entry index. final long index = getNextIndex(); // Serialize the entry. - int position = buffer.position(); - if (position + HEADER_BYTES > buffer.limit()) { + final int bodyPosition = currentPosition + HEADER_BYTES; + final int avail = maxSegmentSize - bodyPosition; + if (avail < 0) { throw new BufferOverflowException(); } - buffer.position(position + HEADER_BYTES); - + final var entryBytes = buffer.slice(bodyPosition, Math.min(avail, maxEntrySize)); try { - namespace.serialize(entry, buffer); + namespace.serialize(entry, entryBytes); } catch (KryoException e) { - throw new BufferOverflowException(); - } - - final int length = buffer.position() - (position + HEADER_BYTES); + if (entryBytes.capacity() != maxEntrySize) { + // We have not provided enough capacity, signal to roll to next segment + throw new BufferOverflowException(); + } - // If the entry length exceeds the maximum entry size then throw an exception. - if (length > maxEntrySize) { // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum. - buffer.position(position); - throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); + throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); } + final int length = entryBytes.position(); + // Compute the checksum for the entry. - final CRC32 crc32 = new CRC32(); - buffer.position(position + HEADER_BYTES); - ByteBuffer slice = buffer.slice(); - slice.limit(length); - crc32.update(slice); - final long checksum = crc32.getValue(); + final var crc32 = new CRC32(); + crc32.update(entryBytes.flip()); // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - buffer.position(position).putInt(length).putInt((int) checksum).position(position + HEADER_BYTES + length); + buffer.putInt(currentPosition, length).putInt(currentPosition + Integer.BYTES, (int) crc32.getValue()); // Update the last entry with the correct index/term/length. Indexed indexedEntry = new Indexed<>(index, entry, length); - this.lastEntry = indexedEntry; - this.index.index(index, position); + lastEntry = indexedEntry; + this.index.index(index, currentPosition); + + currentPosition = currentPosition + HEADER_BYTES + length; return (Indexed) indexedEntry; } @Override - void truncate(long index) { + void truncate(final long index) { // If the index is greater than or equal to the last index, skip the truncate. if (index >= getLastIndex()) { return; @@ -216,16 +201,15 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { if (index < firstIndex) { // Reset the writer to the first entry. - buffer.position(JournalSegmentDescriptor.BYTES); + currentPosition = JournalSegmentDescriptor.BYTES; } else { // Reset the writer to the given index. reset(index); } // Zero the entry header at current buffer position. - int position = buffer.position(); // Note: we issue a single putLong() instead of two putInt()s. - buffer.putLong(0).position(position); + buffer.putLong(currentPosition, 0L); } @Override