X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FFileChannelJournalSegmentWriter.java;h=fa8b02bded1cdabff58370cf66e12aa12a9318da;hb=b1773fc588ee5ea36f3bf5378ff54aa3dbffb64e;hp=ed9ff922f7da6aa6ad816239ba4fa8a9525014c2;hpb=7dd119a473054c089c12e66780b136f11eb4f5d2;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java index ed9ff922f7..fa8b02bded 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.zip.CRC32; @@ -40,17 +41,13 @@ import java.util.zip.CRC32; * * @author Jordan Halterman */ -class FileChannelJournalSegmentWriter implements JournalWriter { +final class FileChannelJournalSegmentWriter extends JournalSegmentWriter { private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]); private final FileChannel channel; - private final JournalSegment segment; - private final int maxEntrySize; - private final JournalIndex index; - private final JournalSerdes namespace; private final ByteBuffer memory; - private final long firstIndex; private Indexed lastEntry; + private long currentPosition; FileChannelJournalSegmentWriter( FileChannel channel, @@ -58,31 +55,29 @@ class FileChannelJournalSegmentWriter implements JournalWriter { int maxEntrySize, JournalIndex index, JournalSerdes namespace) { + super(segment, maxEntrySize, index, namespace); this.channel = channel; - this.segment = segment; - this.maxEntrySize = maxEntrySize; - this.index = index; this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2); memory.limit(0); - this.namespace = namespace; - this.firstIndex = segment.index(); reset(0); } + @Override + MappedByteBuffer buffer() { + return null; + } + @Override public void reset(long index) { long nextIndex = firstIndex; // Clear the buffer indexes. - try { - channel.position(JournalSegmentDescriptor.BYTES); - - // Record the current buffer position. - long position = channel.position(); + currentPosition = JournalSegmentDescriptor.BYTES; + try { // Clear memory buffer and read fist chunk memory.clear(); - channel.read(memory, position); + channel.read(memory, JournalSegmentDescriptor.BYTES); memory.flip(); // Read the entry length. @@ -110,18 +105,17 @@ class FileChannelJournalSegmentWriter implements JournalWriter { entryBytes.rewind(); final E entry = namespace.deserialize(entryBytes); lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, (int) position); + this.index.index(nextIndex, (int) currentPosition); nextIndex++; // Update the current position for indexing. - position = position + Integer.BYTES + Integer.BYTES + length; - channel.position(position); + currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length; memory.position(memory.position() + length); // Read more bytes from the segment if necessary. if (memory.remaining() < maxEntrySize) { memory.clear(); - channel.read(memory, position); + channel.read(memory, currentPosition); memory.flip(); } @@ -175,53 +169,49 @@ class FileChannelJournalSegmentWriter implements JournalWriter { // Store the entry index. final long index = getNextIndex(); + // Serialize the entry. + memory.clear(); + memory.position(Integer.BYTES + Integer.BYTES); try { - // Serialize the entry. - memory.clear(); - memory.position(Integer.BYTES + Integer.BYTES); - try { - namespace.serialize(entry, memory); - } catch (KryoException e) { - throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } - memory.flip(); + namespace.serialize(entry, memory); + } catch (KryoException e) { + throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + memory.flip(); - final int length = memory.limit() - (Integer.BYTES + Integer.BYTES); + final int length = memory.limit() - (Integer.BYTES + Integer.BYTES); - // Ensure there's enough space left in the buffer to store the entry. - long position = channel.position(); - if (segment.descriptor().maxSegmentSize() - position < length + Integer.BYTES + Integer.BYTES) { - throw new BufferOverflowException(); - } + // Ensure there's enough space left in the buffer to store the entry. + if (segment.descriptor().maxSegmentSize() - currentPosition < length + Integer.BYTES + Integer.BYTES) { + throw new BufferOverflowException(); + } - // If the entry length exceeds the maximum entry size then throw an exception. - if (length > maxEntrySize) { - throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } + // If the entry length exceeds the maximum entry size then throw an exception. + if (length > maxEntrySize) { + throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } - // Compute the checksum for the entry. - final CRC32 crc32 = new CRC32(); - crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES)); - final long checksum = crc32.getValue(); - - // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - memory.putInt(0, length); - memory.putInt(Integer.BYTES, (int) checksum); - channel.write(memory); - - // Update the last entry with the correct index/term/length. - Indexed indexedEntry = new Indexed<>(index, entry, length); - this.lastEntry = indexedEntry; - this.index.index(index, (int) position); - return (Indexed) indexedEntry; + // Compute the checksum for the entry. + final CRC32 crc32 = new CRC32(); + crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES)); + final long checksum = crc32.getValue(); + + // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. + memory.putInt(0, length); + memory.putInt(Integer.BYTES, (int) checksum); + try { + channel.write(memory, currentPosition); } catch (IOException e) { throw new StorageException(e); } - } - @Override - public void commit(long index) { + // Update the last entry with the correct index/term/length. + Indexed indexedEntry = new Indexed<>(index, entry, length); + this.lastEntry = indexedEntry; + this.index.index(index, (int) currentPosition); + currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length; + return (Indexed) indexedEntry; } @Override @@ -240,14 +230,14 @@ class FileChannelJournalSegmentWriter implements JournalWriter { try { if (index < segment.index()) { // Reset the writer to the first entry. - channel.position(JournalSegmentDescriptor.BYTES); + currentPosition = JournalSegmentDescriptor.BYTES; } else { // Reset the writer to the given index. reset(index); } // Zero the entry header at current channel position. - channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), channel.position()); + channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), currentPosition); } catch (IOException e) { throw new StorageException(e); }