X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FFileChannelJournalSegmentWriter.java;h=fa8b02bded1cdabff58370cf66e12aa12a9318da;hb=b1773fc588ee5ea36f3bf5378ff54aa3dbffb64e;hp=ba99cf3c7e322ef5ce627cd2af29ccf1342c54cf;hpb=16bcd8d75886f4d9d8ad4e6833147aa516358adf;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java index ba99cf3c7e..fa8b02bded 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java @@ -22,9 +22,9 @@ import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.zip.CRC32; -import java.util.zip.Checksum; /** * Segment writer. @@ -41,15 +41,13 @@ import java.util.zip.Checksum; * * @author Jordan Halterman */ -class FileChannelJournalSegmentWriter implements JournalWriter { +final class FileChannelJournalSegmentWriter extends JournalSegmentWriter { + private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]); + private final FileChannel channel; - private final JournalSegment segment; - private final int maxEntrySize; - private final JournalIndex index; - private final JournalSerdes namespace; private final ByteBuffer memory; - private final long firstIndex; private Indexed lastEntry; + private long currentPosition; FileChannelJournalSegmentWriter( FileChannel channel, @@ -57,39 +55,32 @@ class FileChannelJournalSegmentWriter implements JournalWriter { int maxEntrySize, JournalIndex index, JournalSerdes namespace) { + super(segment, maxEntrySize, index, namespace); this.channel = channel; - this.segment = segment; - this.maxEntrySize = maxEntrySize; - this.index = index; this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2); memory.limit(0); - this.namespace = namespace; - this.firstIndex = segment.index(); reset(0); } + @Override + MappedByteBuffer buffer() { + return null; + } + @Override public void reset(long index) { long nextIndex = firstIndex; // Clear the buffer indexes. + currentPosition = JournalSegmentDescriptor.BYTES; + try { - channel.position(JournalSegmentDescriptor.BYTES); - memory.clear().flip(); - - // Record the current buffer position. - long position = channel.position(); - - // Read more bytes from the segment if necessary. - if (memory.remaining() < maxEntrySize) { - memory.clear(); - channel.read(memory); - channel.position(position); - memory.flip(); - } + // Clear memory buffer and read fist chunk + memory.clear(); + channel.read(memory, JournalSegmentDescriptor.BYTES); + memory.flip(); // Read the entry length. - memory.mark(); int length = memory.getInt(); // If the length is non-zero, read the entry. @@ -98,47 +89,40 @@ class FileChannelJournalSegmentWriter implements JournalWriter { // Read the checksum of the entry. final long checksum = memory.getInt() & 0xFFFFFFFFL; + // Slice off the entry's bytes + final ByteBuffer entryBytes = memory.slice(); + entryBytes.limit(length); + // Compute the checksum for the entry bytes. - final Checksum crc32 = new CRC32(); - crc32.update(memory.array(), memory.position(), length); - - // If the stored checksum equals the computed checksum, return the entry. - if (checksum == crc32.getValue()) { - int limit = memory.limit(); - memory.limit(memory.position() + length); - final E entry = namespace.deserialize(memory); - memory.limit(limit); - lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, (int) position); - nextIndex++; - } else { + final CRC32 crc32 = new CRC32(); + crc32.update(entryBytes); + + // If the stored checksum does not equal the computed checksum, do not proceed further + if (checksum != crc32.getValue()) { break; } + entryBytes.rewind(); + final E entry = namespace.deserialize(entryBytes); + lastEntry = new Indexed<>(nextIndex, entry, length); + this.index.index(nextIndex, (int) currentPosition); + nextIndex++; + // Update the current position for indexing. - position = channel.position() + memory.position(); + currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length; + memory.position(memory.position() + length); // Read more bytes from the segment if necessary. if (memory.remaining() < maxEntrySize) { - channel.position(position); memory.clear(); - channel.read(memory); - channel.position(position); + channel.read(memory, currentPosition); memory.flip(); } - memory.mark(); length = memory.getInt(); } - - // Reset the buffer to the previous mark. - channel.position(channel.position() + memory.reset().position()); } catch (BufferUnderflowException e) { - try { - channel.position(channel.position() + memory.reset().position()); - } catch (IOException e2) { - throw new StorageException(e2); - } + // No-op, position is only updated on success } catch (IOException e) { throw new StorageException(e); } @@ -163,45 +147,6 @@ class FileChannelJournalSegmentWriter implements JournalWriter { } } - /** - * Returns the size of the underlying buffer. - * - * @return The size of the underlying buffer. - */ - public long size() { - try { - return channel.position(); - } catch (IOException e) { - throw new StorageException(e); - } - } - - /** - * Returns a boolean indicating whether the segment is empty. - * - * @return Indicates whether the segment is empty. - */ - public boolean isEmpty() { - return lastEntry == null; - } - - /** - * Returns a boolean indicating whether the segment is full. - * - * @return Indicates whether the segment is full. - */ - public boolean isFull() { - return size() >= segment.descriptor().maxSegmentSize() - || getNextIndex() - firstIndex >= segment.descriptor().maxEntries(); - } - - /** - * Returns the first index written to the segment. - */ - public long firstIndex() { - return firstIndex; - } - @Override public void append(Indexed entry) { final long nextIndex = getNextIndex(); @@ -224,53 +169,49 @@ class FileChannelJournalSegmentWriter implements JournalWriter { // Store the entry index. final long index = getNextIndex(); + // Serialize the entry. + memory.clear(); + memory.position(Integer.BYTES + Integer.BYTES); try { - // Serialize the entry. - memory.clear(); - memory.position(Integer.BYTES + Integer.BYTES); - try { - namespace.serialize(entry, memory); - } catch (KryoException e) { - throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } - memory.flip(); + namespace.serialize(entry, memory); + } catch (KryoException e) { + throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + memory.flip(); - final int length = memory.limit() - (Integer.BYTES + Integer.BYTES); + final int length = memory.limit() - (Integer.BYTES + Integer.BYTES); - // Ensure there's enough space left in the buffer to store the entry. - long position = channel.position(); - if (segment.descriptor().maxSegmentSize() - position < length + Integer.BYTES + Integer.BYTES) { - throw new BufferOverflowException(); - } + // Ensure there's enough space left in the buffer to store the entry. + if (segment.descriptor().maxSegmentSize() - currentPosition < length + Integer.BYTES + Integer.BYTES) { + throw new BufferOverflowException(); + } - // If the entry length exceeds the maximum entry size then throw an exception. - if (length > maxEntrySize) { - throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } + // If the entry length exceeds the maximum entry size then throw an exception. + if (length > maxEntrySize) { + throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + + // Compute the checksum for the entry. + final CRC32 crc32 = new CRC32(); + crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES)); + final long checksum = crc32.getValue(); - // Compute the checksum for the entry. - final Checksum crc32 = new CRC32(); - crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES)); - final long checksum = crc32.getValue(); - - // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - memory.putInt(0, length); - memory.putInt(Integer.BYTES, (int) checksum); - channel.write(memory); - - // Update the last entry with the correct index/term/length. - Indexed indexedEntry = new Indexed<>(index, entry, length); - this.lastEntry = indexedEntry; - this.index.index(index, (int) position); - return (Indexed) indexedEntry; + // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. + memory.putInt(0, length); + memory.putInt(Integer.BYTES, (int) checksum); + try { + channel.write(memory, currentPosition); } catch (IOException e) { throw new StorageException(e); } - } - @Override - public void commit(long index) { + // Update the last entry with the correct index/term/length. + Indexed indexedEntry = new Indexed<>(index, entry, length); + this.lastEntry = indexedEntry; + this.index.index(index, (int) currentPosition); + currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length; + return (Indexed) indexedEntry; } @Override @@ -283,39 +224,25 @@ class FileChannelJournalSegmentWriter implements JournalWriter { // Reset the last entry. lastEntry = null; - try { - // Truncate the index. - this.index.truncate(index); + // Truncate the index. + this.index.truncate(index); + try { if (index < segment.index()) { - channel.position(JournalSegmentDescriptor.BYTES); - channel.write(zero()); - channel.position(JournalSegmentDescriptor.BYTES); + // Reset the writer to the first entry. + currentPosition = JournalSegmentDescriptor.BYTES; } else { // Reset the writer to the given index. reset(index); - - // Zero entries after the given index. - long position = channel.position(); - channel.write(zero()); - channel.position(position); } + + // Zero the entry header at current channel position. + channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), currentPosition); } catch (IOException e) { throw new StorageException(e); } } - /** - * Returns a zeroed out byte buffer. - */ - private ByteBuffer zero() { - memory.clear(); - for (int i = 0; i < memory.limit(); i++) { - memory.put(i, (byte) 0); - } - return memory; - } - @Override public void flush() { try {