X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FMappedJournalSegmentWriter.java;h=99180c5840321165696796a45234f5dcfcbb237e;hb=479ecb0109aaa1c8682dd083600b777cbd68dc07;hp=f5d4ed9d04a9554de4ec935c6b54b85720158973;hpb=d9569bc53baaf2e830144e0200f7b6baba15cbe0;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java index f5d4ed9d04..99180c5840 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java @@ -1,5 +1,6 @@ /* - * Copyright 2017-present Open Networking Foundation + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,9 +16,10 @@ */ package io.atomix.storage.journal; +import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; + import com.esotericsoftware.kryo.KryoException; import io.atomix.storage.journal.index.JournalIndex; - import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; @@ -55,13 +57,24 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { JournalIndex index, JournalSerdes namespace) { super(channel, segment, maxEntrySize, index, namespace); + mappedBuffer = mapBuffer(channel, maxSegmentSize); + buffer = mappedBuffer.slice(); + reset(0); + } + + MappedJournalSegmentWriter(JournalSegmentWriter previous, int position) { + super(previous); + mappedBuffer = mapBuffer(channel, maxSegmentSize); + buffer = mappedBuffer.slice().position(position); + lastEntry = previous.getLastEntry(); + } + + private static @NonNull MappedByteBuffer mapBuffer(FileChannel channel, int maxSegmentSize) { try { - mappedBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize()); + return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize); } catch (IOException e) { throw new StorageException(e); } - this.buffer = mappedBuffer.slice(); - reset(0); } @Override @@ -75,13 +88,14 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { } @Override - FileChannelJournalSegmentWriter toFileChannel() { + DiskJournalSegmentWriter toFileChannel() { + final int position = buffer.position(); close(); - return new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace); + return new DiskJournalSegmentWriter<>(this, position); } @Override - public void reset(long index) { + void reset(long index) { long nextIndex = firstIndex; // Clear the buffer indexes. @@ -125,8 +139,7 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { position = buffer.position() + length; buffer.position(position); - buffer.mark(); - length = buffer.getInt(); + length = buffer.mark().getInt(); } // Reset the buffer to the previous mark. @@ -137,53 +150,23 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { } @Override - public long getLastIndex() { - return lastEntry != null ? lastEntry.index() : segment.index() - 1; - } - - @Override - public Indexed getLastEntry() { + Indexed getLastEntry() { return lastEntry; } - @Override - public long getNextIndex() { - if (lastEntry != null) { - return lastEntry.index() + 1; - } else { - return firstIndex; - } - } - - @Override - public void append(Indexed entry) { - final long nextIndex = getNextIndex(); - - // If the entry's index is greater than the next index in the segment, skip some entries. - if (entry.index() > nextIndex) { - throw new IndexOutOfBoundsException("Entry index is not sequential"); - } - - // If the entry's index is less than the next index, truncate the segment. - if (entry.index() < nextIndex) { - truncate(entry.index() - 1); - } - append(entry.entry()); - } - @Override @SuppressWarnings("unchecked") - public Indexed append(T entry) { + Indexed append(T entry) { // Store the entry index. final long index = getNextIndex(); // Serialize the entry. int position = buffer.position(); - if (position + Integer.BYTES + Integer.BYTES > buffer.limit()) { + if (position + HEADER_BYTES > buffer.limit()) { throw new BufferOverflowException(); } - buffer.position(position + Integer.BYTES + Integer.BYTES); + buffer.position(position + HEADER_BYTES); try { namespace.serialize(entry, buffer); @@ -191,7 +174,7 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { throw new BufferOverflowException(); } - final int length = buffer.position() - (position + Integer.BYTES + Integer.BYTES); + final int length = buffer.position() - (position + HEADER_BYTES); // If the entry length exceeds the maximum entry size then throw an exception. if (length > maxEntrySize) { @@ -202,17 +185,14 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { // Compute the checksum for the entry. final CRC32 crc32 = new CRC32(); - buffer.position(position + Integer.BYTES + Integer.BYTES); + buffer.position(position + HEADER_BYTES); ByteBuffer slice = buffer.slice(); slice.limit(length); crc32.update(slice); final long checksum = crc32.getValue(); // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - buffer.position(position); - buffer.putInt(length); - buffer.putInt((int) checksum); - buffer.position(position + Integer.BYTES + Integer.BYTES + length); + buffer.position(position).putInt(length).putInt((int) checksum).position(position + HEADER_BYTES + length); // Update the last entry with the correct index/term/length. Indexed indexedEntry = new Indexed<>(index, entry, length); @@ -221,9 +201,8 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { return (Indexed) indexedEntry; } - @Override - public void truncate(long index) { + void truncate(long index) { // If the index is greater than or equal to the last index, skip the truncate. if (index >= getLastIndex()) { return; @@ -235,7 +214,7 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { // Truncate the index. this.index.truncate(index); - if (index < segment.index()) { + if (index < firstIndex) { // Reset the writer to the first entry. buffer.position(JournalSegmentDescriptor.BYTES); } else { @@ -246,17 +225,16 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { // Zero the entry header at current buffer position. int position = buffer.position(); // Note: we issue a single putLong() instead of two putInt()s. - buffer.putLong(0); - buffer.position(position); + buffer.putLong(0).position(position); } @Override - public void flush() { + void flush() { mappedBuffer.force(); } @Override - public void close() { + void close() { flush(); try { BufferCleaner.freeBuffer(mappedBuffer);