X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FMappedJournalSegmentWriter.java;h=f5d4ed9d04a9554de4ec935c6b54b85720158973;hb=d9569bc53baaf2e830144e0200f7b6baba15cbe0;hp=6ca6ab30dd72cce4a8952782375d83a2e17b3c67;hpb=c15f343c9fdf01aaff0170c58398d0b8a7259822;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java index 6ca6ab30dd..f5d4ed9d04 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java @@ -23,7 +23,9 @@ import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.util.zip.CRC32; +import org.eclipse.jdt.annotation.NonNull; /** * Segment writer. @@ -40,41 +42,44 @@ import java.util.zip.CRC32; * * @author Jordan Halterman */ -class MappedJournalSegmentWriter implements JournalWriter { - private final MappedByteBuffer mappedBuffer; +final class MappedJournalSegmentWriter extends JournalSegmentWriter { + private final @NonNull MappedByteBuffer mappedBuffer; private final ByteBuffer buffer; - private final JournalSegment segment; - private final int maxEntrySize; - private final JournalIndex index; - private final JournalSerdes namespace; - private final long firstIndex; + private Indexed lastEntry; MappedJournalSegmentWriter( - MappedByteBuffer buffer, + FileChannel channel, JournalSegment segment, int maxEntrySize, JournalIndex index, JournalSerdes namespace) { - this.mappedBuffer = buffer; - this.buffer = buffer.slice(); - this.segment = segment; - this.maxEntrySize = maxEntrySize; - this.index = index; - this.namespace = namespace; - this.firstIndex = segment.index(); + super(channel, segment, maxEntrySize, index, namespace); + try { + mappedBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize()); + } catch (IOException e) { + throw new StorageException(e); + } + this.buffer = mappedBuffer.slice(); reset(0); } - /** - * Returns the mapped buffer underlying the segment writer. - * - * @return the mapped buffer underlying the segment writer - */ - MappedByteBuffer buffer() { + @Override + @NonNull MappedByteBuffer buffer() { return mappedBuffer; } + @Override + MappedJournalSegmentWriter toMapped() { + return this; + } + + @Override + FileChannelJournalSegmentWriter toFileChannel() { + close(); + return new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace); + } + @Override public void reset(long index) { long nextIndex = firstIndex; @@ -97,23 +102,25 @@ class MappedJournalSegmentWriter implements JournalWriter { // Read the checksum of the entry. final long checksum = buffer.getInt() & 0xFFFFFFFFL; + // Slice off the entry's bytes + final ByteBuffer entryBytes = buffer.slice(); + entryBytes.limit(length); + // Compute the checksum for the entry bytes. final CRC32 crc32 = new CRC32(); - ByteBuffer slice = buffer.slice(); - slice.limit(length); - crc32.update(slice); - - // If the stored checksum equals the computed checksum, return the entry. - if (checksum == crc32.getValue()) { - slice.rewind(); - final E entry = namespace.deserialize(slice); - lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, position); - nextIndex++; - } else { + crc32.update(entryBytes); + + // If the stored checksum does not equal the computed checksum, do not proceed further + if (checksum != crc32.getValue()) { break; } + entryBytes.rewind(); + final E entry = namespace.deserialize(entryBytes); + lastEntry = new Indexed<>(nextIndex, entry, length); + this.index.index(nextIndex, position); + nextIndex++; + // Update the current position for indexing. position = buffer.position() + length; buffer.position(position); @@ -214,10 +221,6 @@ class MappedJournalSegmentWriter implements JournalWriter { return (Indexed) indexedEntry; } - @Override - public void commit(long index) { - - } @Override public void truncate(long index) { @@ -233,20 +236,18 @@ class MappedJournalSegmentWriter implements JournalWriter { this.index.truncate(index); if (index < segment.index()) { - buffer.position(JournalSegmentDescriptor.BYTES); - buffer.putInt(0); - buffer.putInt(0); + // Reset the writer to the first entry. buffer.position(JournalSegmentDescriptor.BYTES); } else { // Reset the writer to the given index. reset(index); - - // Zero entries after the given index. - int position = buffer.position(); - buffer.putInt(0); - buffer.putInt(0); - buffer.position(position); } + + // Zero the entry header at current buffer position. + int position = buffer.position(); + // Note: we issue a single putLong() instead of two putInt()s. + buffer.putLong(0); + buffer.position(position); } @Override