X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FDiskJournalSegmentWriter.java;h=e51de40687c9723ff74dc3340975f2260875003a;hp=c46b55cac8ecf4bac6098647b3964b696b8d3ba6;hb=130476cd353b68f05e5dc26f64eade38fc1c68e1;hpb=4d1e330341bcd0788aca06505426e61ba25e6e1e diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java index c46b55cac8..e51de40687 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java @@ -19,18 +19,14 @@ package io.atomix.storage.journal; import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; import com.esotericsoftware.kryo.KryoException; -import com.google.common.annotations.VisibleForTesting; +import io.atomix.storage.journal.StorageException.TooLarge; import io.atomix.storage.journal.index.JournalIndex; import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; import java.util.zip.CRC32; -import org.eclipse.jdt.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Segment writer. @@ -48,37 +44,34 @@ import org.slf4j.LoggerFactory; * @author Jordan Halterman */ final class DiskJournalSegmentWriter extends JournalSegmentWriter { - private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class); private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]); - private final ByteBuffer memory; + private final JournalSegmentReader reader; + private final ByteBuffer buffer; + private Indexed lastEntry; private long currentPosition; - DiskJournalSegmentWriter( - FileChannel channel, - JournalSegment segment, - int maxEntrySize, - JournalIndex index, - JournalSerdes namespace) { + DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, + final JournalIndex index, final JournalSerdes namespace) { super(channel, segment, maxEntrySize, index, namespace); - memory = allocMemory(maxEntrySize); + + buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize); + final var fileReader = new DiskFileReader(segment.file().file().toPath(), channel, maxSegmentSize, maxEntrySize); + reader = new JournalSegmentReader<>(segment, fileReader, maxEntrySize, namespace); reset(0); } - DiskJournalSegmentWriter(JournalSegmentWriter previous, int position) { + DiskJournalSegmentWriter(final JournalSegmentWriter previous, final int position) { super(previous); - memory = allocMemory(maxEntrySize); + + buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize); + final var fileReader = new DiskFileReader(segment.file().file().toPath(), channel, maxSegmentSize, maxEntrySize); + reader = new JournalSegmentReader<>(segment, fileReader, maxEntrySize, namespace); lastEntry = previous.getLastEntry(); currentPosition = position; } - private static ByteBuffer allocMemory(int maxEntrySize) { - final var buf = ByteBuffer.allocate((maxEntrySize + HEADER_BYTES) * 2); - buf.limit(0); - return buf; - } - @Override MappedByteBuffer buffer() { return null; @@ -96,109 +89,36 @@ final class DiskJournalSegmentWriter extends JournalSegmentWriter { @Override void reset(final long index) { - long nextIndex = firstIndex; - - // Clear the buffer indexes. - currentPosition = JournalSegmentDescriptor.BYTES; - + // acquire ownership of cache and make sure reader does not see anything we've done once we're done + reader.invalidateCache(); try { - // Clear memory buffer and read fist chunk - channel.read(memory.clear(), JournalSegmentDescriptor.BYTES); - memory.flip(); - - while (index == 0 || nextIndex <= index) { - final var entry = prepareNextEntry(channel, memory, maxEntrySize); - if (entry == null) { - break; - } - - final var bytes = entry.bytes(); - final var length = bytes.remaining(); - try { - lastEntry = new Indexed<>(nextIndex, namespace.deserialize(bytes), length); - } catch (KryoException e) { - // No-op, position is only updated on success - LOG.debug("Failed to deserialize entry", e); - break; - } - - this.index.index(nextIndex, (int) currentPosition); - nextIndex++; - - // Update the current position for indexing. - currentPosition = currentPosition + HEADER_BYTES + length; - memory.position(memory.position() + length); - } - } catch (IOException e) { - throw new StorageException(e); + resetWithBuffer(index); + } finally { + // Make sure reader does not see anything we've done + reader.invalidateCache(); } } - @VisibleForTesting - static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory, - final int maxEntrySize) throws IOException { - int remaining = memory.remaining(); - boolean compacted; - if (remaining < HEADER_BYTES) { - // We do not have the header available. Move the pointer and read. - channel.read(memory.compact()); - remaining = memory.flip().remaining(); - if (remaining < HEADER_BYTES) { - // could happen with mis-padded segment - return null; - } - compacted = true; - } else { - compacted = false; - } + private void resetWithBuffer(final long index) { + long nextIndex = firstIndex; - int length; - while (true) { - length = memory.mark().getInt(); - if (length < 1 || length > maxEntrySize) { - // Invalid length, - memory.reset(); - return null; - } + // Clear the buffer indexes and acquire ownership of the buffer + currentPosition = JournalSegmentDescriptor.BYTES; + reader.setPosition(JournalSegmentDescriptor.BYTES); - if (remaining >= Integer.BYTES + length) { - // Fast path: we have the entry properly positioned + while (index == 0 || nextIndex <= index) { + final var entry = reader.readEntry(nextIndex); + if (entry == null) { break; } - // Not enough data for entry, to header start - memory.reset(); - if (compacted) { - // we have already compacted the buffer, there is just not enough data - return null; - } + lastEntry = entry; + this.index.index(nextIndex, (int) currentPosition); + nextIndex++; - // Try to read more data and check again - channel.read(memory.compact()); - remaining = memory.flip().remaining(); - compacted = true; + // Update the current position for indexing. + currentPosition = currentPosition + HEADER_BYTES + entry.size(); } - - // Read the checksum of the entry. - final int checksum = memory.getInt(); - - // Slice off the entry's bytes - final var entryBytes = memory.slice(); - entryBytes.limit(length); - - // Compute the checksum for the entry bytes. - final var crc32 = new CRC32(); - crc32.update(entryBytes); - - // If the stored checksum does not equal the computed checksum, do not proceed further - final var computed = (int) crc32.getValue(); - if (checksum != computed) { - LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed)); - memory.reset(); - return null; - } - - return new SegmentEntry(checksum, entryBytes.rewind()); } @Override @@ -208,54 +128,52 @@ final class DiskJournalSegmentWriter extends JournalSegmentWriter { @Override @SuppressWarnings("unchecked") - Indexed append(T entry) { - // Store the entry index. - final long index = getNextIndex(); - - // Serialize the entry. - try { - namespace.serialize(entry, memory.clear().position(HEADER_BYTES)); - } catch (KryoException e) { - throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } - memory.flip(); + Indexed append(final T entry) { + // Store the entry index. + final long index = getNextIndex(); - final int length = memory.limit() - HEADER_BYTES; + // Serialize the entry. + try { + namespace.serialize(entry, buffer.clear().position(HEADER_BYTES)); + } catch (KryoException e) { + throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + buffer.flip(); - // Ensure there's enough space left in the buffer to store the entry. - if (maxSegmentSize - currentPosition < length + HEADER_BYTES) { - throw new BufferOverflowException(); - } + final int length = buffer.limit() - HEADER_BYTES; + // Ensure there's enough space left in the buffer to store the entry. + if (maxSegmentSize - currentPosition < length + HEADER_BYTES) { + throw new BufferOverflowException(); + } - // If the entry length exceeds the maximum entry size then throw an exception. - if (length > maxEntrySize) { - throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } + // If the entry length exceeds the maximum entry size then throw an exception. + if (length > maxEntrySize) { + throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } - // Compute the checksum for the entry. - final CRC32 crc32 = new CRC32(); - crc32.update(memory.array(), HEADER_BYTES, memory.limit() - HEADER_BYTES); - final long checksum = crc32.getValue(); + // Compute the checksum for the entry. + final var crc32 = new CRC32(); + crc32.update(buffer.slice(HEADER_BYTES, length)); - // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - memory.putInt(0, length).putInt(Integer.BYTES, (int) checksum); - try { - channel.write(memory, currentPosition); - } catch (IOException e) { - throw new StorageException(e); - } + // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. + buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue()); + try { + channel.write(buffer, currentPosition); + } catch (IOException e) { + throw new StorageException(e); + } - // Update the last entry with the correct index/term/length. - Indexed indexedEntry = new Indexed<>(index, entry, length); - this.lastEntry = indexedEntry; - this.index.index(index, (int) currentPosition); + // Update the last entry with the correct index/term/length. + final var indexedEntry = new Indexed(index, entry, length); + lastEntry = indexedEntry; + this.index.index(index, (int) currentPosition); - currentPosition = currentPosition + HEADER_BYTES + length; - return (Indexed) indexedEntry; + currentPosition = currentPosition + HEADER_BYTES + length; + return (Indexed) indexedEntry; } @Override - void truncate(long index) { + void truncate(final long index) { // If the index is greater than or equal to the last index, skip the truncate. if (index >= getLastIndex()) { return;