X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FMappedJournalSegmentWriter.java;h=99180c5840321165696796a45234f5dcfcbb237e;hb=479ecb0109aaa1c8682dd083600b777cbd68dc07;hp=6ca6ab30dd72cce4a8952782375d83a2e17b3c67;hpb=c15f343c9fdf01aaff0170c58398d0b8a7259822;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java index 6ca6ab30dd..99180c5840 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java @@ -1,5 +1,6 @@ /* - * Copyright 2017-present Open Networking Foundation + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,15 +16,18 @@ */ package io.atomix.storage.journal; +import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; + import com.esotericsoftware.kryo.KryoException; import io.atomix.storage.journal.index.JournalIndex; - import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.util.zip.CRC32; +import org.eclipse.jdt.annotation.NonNull; /** * Segment writer. @@ -40,43 +44,58 @@ import java.util.zip.CRC32; * * @author Jordan Halterman */ -class MappedJournalSegmentWriter implements JournalWriter { - private final MappedByteBuffer mappedBuffer; +final class MappedJournalSegmentWriter extends JournalSegmentWriter { + private final @NonNull MappedByteBuffer mappedBuffer; private final ByteBuffer buffer; - private final JournalSegment segment; - private final int maxEntrySize; - private final JournalIndex index; - private final JournalSerdes namespace; - private final long firstIndex; + private Indexed lastEntry; MappedJournalSegmentWriter( - MappedByteBuffer buffer, + FileChannel channel, JournalSegment segment, int maxEntrySize, JournalIndex index, JournalSerdes namespace) { - this.mappedBuffer = buffer; - this.buffer = buffer.slice(); - this.segment = segment; - this.maxEntrySize = maxEntrySize; - this.index = index; - this.namespace = namespace; - this.firstIndex = segment.index(); + super(channel, segment, maxEntrySize, index, namespace); + mappedBuffer = mapBuffer(channel, maxSegmentSize); + buffer = mappedBuffer.slice(); reset(0); } - /** - * Returns the mapped buffer underlying the segment writer. - * - * @return the mapped buffer underlying the segment writer - */ - MappedByteBuffer buffer() { + MappedJournalSegmentWriter(JournalSegmentWriter previous, int position) { + super(previous); + mappedBuffer = mapBuffer(channel, maxSegmentSize); + buffer = mappedBuffer.slice().position(position); + lastEntry = previous.getLastEntry(); + } + + private static @NonNull MappedByteBuffer mapBuffer(FileChannel channel, int maxSegmentSize) { + try { + return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize); + } catch (IOException e) { + throw new StorageException(e); + } + } + + @Override + @NonNull MappedByteBuffer buffer() { return mappedBuffer; } @Override - public void reset(long index) { + MappedJournalSegmentWriter toMapped() { + return this; + } + + @Override + DiskJournalSegmentWriter toFileChannel() { + final int position = buffer.position(); + close(); + return new DiskJournalSegmentWriter<>(this, position); + } + + @Override + void reset(long index) { long nextIndex = firstIndex; // Clear the buffer indexes. @@ -97,29 +116,30 @@ class MappedJournalSegmentWriter implements JournalWriter { // Read the checksum of the entry. final long checksum = buffer.getInt() & 0xFFFFFFFFL; + // Slice off the entry's bytes + final ByteBuffer entryBytes = buffer.slice(); + entryBytes.limit(length); + // Compute the checksum for the entry bytes. final CRC32 crc32 = new CRC32(); - ByteBuffer slice = buffer.slice(); - slice.limit(length); - crc32.update(slice); - - // If the stored checksum equals the computed checksum, return the entry. - if (checksum == crc32.getValue()) { - slice.rewind(); - final E entry = namespace.deserialize(slice); - lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, position); - nextIndex++; - } else { + crc32.update(entryBytes); + + // If the stored checksum does not equal the computed checksum, do not proceed further + if (checksum != crc32.getValue()) { break; } + entryBytes.rewind(); + final E entry = namespace.deserialize(entryBytes); + lastEntry = new Indexed<>(nextIndex, entry, length); + this.index.index(nextIndex, position); + nextIndex++; + // Update the current position for indexing. position = buffer.position() + length; buffer.position(position); - buffer.mark(); - length = buffer.getInt(); + length = buffer.mark().getInt(); } // Reset the buffer to the previous mark. @@ -130,53 +150,23 @@ class MappedJournalSegmentWriter implements JournalWriter { } @Override - public long getLastIndex() { - return lastEntry != null ? lastEntry.index() : segment.index() - 1; - } - - @Override - public Indexed getLastEntry() { + Indexed getLastEntry() { return lastEntry; } - @Override - public long getNextIndex() { - if (lastEntry != null) { - return lastEntry.index() + 1; - } else { - return firstIndex; - } - } - - @Override - public void append(Indexed entry) { - final long nextIndex = getNextIndex(); - - // If the entry's index is greater than the next index in the segment, skip some entries. - if (entry.index() > nextIndex) { - throw new IndexOutOfBoundsException("Entry index is not sequential"); - } - - // If the entry's index is less than the next index, truncate the segment. - if (entry.index() < nextIndex) { - truncate(entry.index() - 1); - } - append(entry.entry()); - } - @Override @SuppressWarnings("unchecked") - public Indexed append(T entry) { + Indexed append(T entry) { // Store the entry index. final long index = getNextIndex(); // Serialize the entry. int position = buffer.position(); - if (position + Integer.BYTES + Integer.BYTES > buffer.limit()) { + if (position + HEADER_BYTES > buffer.limit()) { throw new BufferOverflowException(); } - buffer.position(position + Integer.BYTES + Integer.BYTES); + buffer.position(position + HEADER_BYTES); try { namespace.serialize(entry, buffer); @@ -184,7 +174,7 @@ class MappedJournalSegmentWriter implements JournalWriter { throw new BufferOverflowException(); } - final int length = buffer.position() - (position + Integer.BYTES + Integer.BYTES); + final int length = buffer.position() - (position + HEADER_BYTES); // If the entry length exceeds the maximum entry size then throw an exception. if (length > maxEntrySize) { @@ -195,17 +185,14 @@ class MappedJournalSegmentWriter implements JournalWriter { // Compute the checksum for the entry. final CRC32 crc32 = new CRC32(); - buffer.position(position + Integer.BYTES + Integer.BYTES); + buffer.position(position + HEADER_BYTES); ByteBuffer slice = buffer.slice(); slice.limit(length); crc32.update(slice); final long checksum = crc32.getValue(); // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - buffer.position(position); - buffer.putInt(length); - buffer.putInt((int) checksum); - buffer.position(position + Integer.BYTES + Integer.BYTES + length); + buffer.position(position).putInt(length).putInt((int) checksum).position(position + HEADER_BYTES + length); // Update the last entry with the correct index/term/length. Indexed indexedEntry = new Indexed<>(index, entry, length); @@ -215,12 +202,7 @@ class MappedJournalSegmentWriter implements JournalWriter { } @Override - public void commit(long index) { - - } - - @Override - public void truncate(long index) { + void truncate(long index) { // If the index is greater than or equal to the last index, skip the truncate. if (index >= getLastIndex()) { return; @@ -232,30 +214,27 @@ class MappedJournalSegmentWriter implements JournalWriter { // Truncate the index. this.index.truncate(index); - if (index < segment.index()) { - buffer.position(JournalSegmentDescriptor.BYTES); - buffer.putInt(0); - buffer.putInt(0); + if (index < firstIndex) { + // Reset the writer to the first entry. buffer.position(JournalSegmentDescriptor.BYTES); } else { // Reset the writer to the given index. reset(index); - - // Zero entries after the given index. - int position = buffer.position(); - buffer.putInt(0); - buffer.putInt(0); - buffer.position(position); } + + // Zero the entry header at current buffer position. + int position = buffer.position(); + // Note: we issue a single putLong() instead of two putInt()s. + buffer.putLong(0).position(position); } @Override - public void flush() { + void flush() { mappedBuffer.force(); } @Override - public void close() { + void close() { flush(); try { BufferCleaner.freeBuffer(mappedBuffer);