X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FFileChannelJournalSegmentWriter.java;h=fa8b02bded1cdabff58370cf66e12aa12a9318da;hb=b1773fc588ee5ea36f3bf5378ff54aa3dbffb64e;hp=025e1a841b1fe7bd3ee211b0adbc73f43b3801bd;hpb=4a74d808e33fef2e002af1acc996b7a2ba228757;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java index 025e1a841b..fa8b02bded 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java @@ -22,9 +22,9 @@ import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.zip.CRC32; -import java.util.zip.Checksum; /** * Segment writer. @@ -41,17 +41,13 @@ import java.util.zip.Checksum; * * @author Jordan Halterman */ -class FileChannelJournalSegmentWriter implements JournalWriter { +final class FileChannelJournalSegmentWriter extends JournalSegmentWriter { private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]); private final FileChannel channel; - private final JournalSegment segment; - private final int maxEntrySize; - private final JournalIndex index; - private final JournalSerdes namespace; private final ByteBuffer memory; - private final long firstIndex; private Indexed lastEntry; + private long currentPosition; FileChannelJournalSegmentWriter( FileChannel channel, @@ -59,39 +55,32 @@ class FileChannelJournalSegmentWriter implements JournalWriter { int maxEntrySize, JournalIndex index, JournalSerdes namespace) { + super(segment, maxEntrySize, index, namespace); this.channel = channel; - this.segment = segment; - this.maxEntrySize = maxEntrySize; - this.index = index; this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2); memory.limit(0); - this.namespace = namespace; - this.firstIndex = segment.index(); reset(0); } + @Override + MappedByteBuffer buffer() { + return null; + } + @Override public void reset(long index) { long nextIndex = firstIndex; // Clear the buffer indexes. + currentPosition = JournalSegmentDescriptor.BYTES; + try { - channel.position(JournalSegmentDescriptor.BYTES); - memory.clear().flip(); - - // Record the current buffer position. - long position = channel.position(); - - // Read more bytes from the segment if necessary. - if (memory.remaining() < maxEntrySize) { - memory.clear(); - channel.read(memory); - channel.position(position); - memory.flip(); - } + // Clear memory buffer and read fist chunk + memory.clear(); + channel.read(memory, JournalSegmentDescriptor.BYTES); + memory.flip(); // Read the entry length. - memory.mark(); int length = memory.getInt(); // If the length is non-zero, read the entry. @@ -100,47 +89,40 @@ class FileChannelJournalSegmentWriter implements JournalWriter { // Read the checksum of the entry. final long checksum = memory.getInt() & 0xFFFFFFFFL; + // Slice off the entry's bytes + final ByteBuffer entryBytes = memory.slice(); + entryBytes.limit(length); + // Compute the checksum for the entry bytes. - final Checksum crc32 = new CRC32(); - crc32.update(memory.array(), memory.position(), length); - - // If the stored checksum equals the computed checksum, return the entry. - if (checksum == crc32.getValue()) { - int limit = memory.limit(); - memory.limit(memory.position() + length); - final E entry = namespace.deserialize(memory); - memory.limit(limit); - lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, (int) position); - nextIndex++; - } else { + final CRC32 crc32 = new CRC32(); + crc32.update(entryBytes); + + // If the stored checksum does not equal the computed checksum, do not proceed further + if (checksum != crc32.getValue()) { break; } + entryBytes.rewind(); + final E entry = namespace.deserialize(entryBytes); + lastEntry = new Indexed<>(nextIndex, entry, length); + this.index.index(nextIndex, (int) currentPosition); + nextIndex++; + // Update the current position for indexing. - position = channel.position() + memory.position(); + currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length; + memory.position(memory.position() + length); // Read more bytes from the segment if necessary. if (memory.remaining() < maxEntrySize) { - channel.position(position); memory.clear(); - channel.read(memory); - channel.position(position); + channel.read(memory, currentPosition); memory.flip(); } - memory.mark(); length = memory.getInt(); } - - // Reset the buffer to the previous mark. - channel.position(channel.position() + memory.reset().position()); } catch (BufferUnderflowException e) { - try { - channel.position(channel.position() + memory.reset().position()); - } catch (IOException e2) { - throw new StorageException(e2); - } + // No-op, position is only updated on success } catch (IOException e) { throw new StorageException(e); } @@ -187,53 +169,49 @@ class FileChannelJournalSegmentWriter implements JournalWriter { // Store the entry index. final long index = getNextIndex(); + // Serialize the entry. + memory.clear(); + memory.position(Integer.BYTES + Integer.BYTES); try { - // Serialize the entry. - memory.clear(); - memory.position(Integer.BYTES + Integer.BYTES); - try { - namespace.serialize(entry, memory); - } catch (KryoException e) { - throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } - memory.flip(); + namespace.serialize(entry, memory); + } catch (KryoException e) { + throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + memory.flip(); - final int length = memory.limit() - (Integer.BYTES + Integer.BYTES); + final int length = memory.limit() - (Integer.BYTES + Integer.BYTES); - // Ensure there's enough space left in the buffer to store the entry. - long position = channel.position(); - if (segment.descriptor().maxSegmentSize() - position < length + Integer.BYTES + Integer.BYTES) { - throw new BufferOverflowException(); - } + // Ensure there's enough space left in the buffer to store the entry. + if (segment.descriptor().maxSegmentSize() - currentPosition < length + Integer.BYTES + Integer.BYTES) { + throw new BufferOverflowException(); + } - // If the entry length exceeds the maximum entry size then throw an exception. - if (length > maxEntrySize) { - throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } + // If the entry length exceeds the maximum entry size then throw an exception. + if (length > maxEntrySize) { + throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + + // Compute the checksum for the entry. + final CRC32 crc32 = new CRC32(); + crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES)); + final long checksum = crc32.getValue(); - // Compute the checksum for the entry. - final Checksum crc32 = new CRC32(); - crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES)); - final long checksum = crc32.getValue(); - - // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - memory.putInt(0, length); - memory.putInt(Integer.BYTES, (int) checksum); - channel.write(memory); - - // Update the last entry with the correct index/term/length. - Indexed indexedEntry = new Indexed<>(index, entry, length); - this.lastEntry = indexedEntry; - this.index.index(index, (int) position); - return (Indexed) indexedEntry; + // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. + memory.putInt(0, length); + memory.putInt(Integer.BYTES, (int) checksum); + try { + channel.write(memory, currentPosition); } catch (IOException e) { throw new StorageException(e); } - } - @Override - public void commit(long index) { + // Update the last entry with the correct index/term/length. + Indexed indexedEntry = new Indexed<>(index, entry, length); + this.lastEntry = indexedEntry; + this.index.index(index, (int) currentPosition); + currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length; + return (Indexed) indexedEntry; } @Override @@ -252,14 +230,14 @@ class FileChannelJournalSegmentWriter implements JournalWriter { try { if (index < segment.index()) { // Reset the writer to the first entry. - channel.position(JournalSegmentDescriptor.BYTES); + currentPosition = JournalSegmentDescriptor.BYTES; } else { // Reset the writer to the given index. reset(index); } // Zero the entry header at current channel position. - channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), channel.position()); + channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), currentPosition); } catch (IOException e) { throw new StorageException(e); }