X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FMappedJournalSegmentWriter.java;h=9f437b6125f4c68d8ea126434f1b805da5c9c478;hb=32bb8e8275884dd0e6dee40b02785c2e606a0914;hp=7ff58590d996729c51e7b8c2612f7a23e82deed5;hpb=b004038a282786fc29b4ba7fc9f8874debe21afd;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java index 7ff58590d9..9f437b6125 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java @@ -1,5 +1,6 @@ /* - * Copyright 2017-present Open Networking Foundation + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,15 +16,17 @@ */ package io.atomix.storage.journal; +import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; + import com.esotericsoftware.kryo.KryoException; import io.atomix.storage.journal.index.JournalIndex; - import java.io.IOException; import java.nio.BufferOverflowException; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.util.zip.CRC32; +import org.eclipse.jdt.annotation.NonNull; /** * Segment writer. @@ -40,187 +43,106 @@ import java.util.zip.CRC32; * * @author Jordan Halterman */ -class MappedJournalSegmentWriter implements JournalWriter { - private final MappedByteBuffer mappedBuffer; +final class MappedJournalSegmentWriter extends JournalSegmentWriter { + private final @NonNull MappedByteBuffer mappedBuffer; + private final JournalSegmentReader reader; private final ByteBuffer buffer; - private final JournalSegment segment; - private final int maxEntrySize; - private final JournalIndex index; - private final JournalSerdes namespace; - private final long firstIndex; - private Indexed lastEntry; - MappedJournalSegmentWriter( - MappedByteBuffer buffer, - JournalSegment segment, - int maxEntrySize, - JournalIndex index, - JournalSerdes namespace) { - this.mappedBuffer = buffer; - this.buffer = buffer.slice(); - this.segment = segment; - this.maxEntrySize = maxEntrySize; - this.index = index; - this.namespace = namespace; - this.firstIndex = segment.index(); - reset(0); - } + MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, + final JournalIndex index, final JournalSerdes namespace) { + super(channel, segment, maxEntrySize, index, namespace); - /** - * Returns the mapped buffer underlying the segment writer. - * - * @return the mapped buffer underlying the segment writer - */ - MappedByteBuffer buffer() { - return mappedBuffer; + mappedBuffer = mapBuffer(channel, maxSegmentSize); + buffer = mappedBuffer.slice(); + reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer), + maxEntrySize, namespace); + reset(0); } - @Override - public void reset(long index) { - long nextIndex = firstIndex; - - // Clear the buffer indexes. - buffer.position(JournalSegmentDescriptor.BYTES); + MappedJournalSegmentWriter(final JournalSegmentWriter previous) { + super(previous); - // Record the current buffer position. - int position = buffer.position(); - - // Read the entry length. - buffer.mark(); + mappedBuffer = mapBuffer(channel, maxSegmentSize); + buffer = mappedBuffer.slice(); + reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer), + maxEntrySize, namespace); + } + private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) { try { - int length = buffer.getInt(); - - // If the length is non-zero, read the entry. - while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) { - - // Read the checksum of the entry. - final long checksum = buffer.getInt() & 0xFFFFFFFFL; - - // Compute the checksum for the entry bytes. - final CRC32 crc32 = new CRC32(); - ByteBuffer slice = buffer.slice(); - slice.limit(length); - crc32.update(slice); - - // If the stored checksum does not equal the computed checksum, do not proceed further - if (checksum != crc32.getValue()) { - break; - } - - slice.rewind(); - final E entry = namespace.deserialize(slice); - lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, position); - nextIndex++; - - // Update the current position for indexing. - position = buffer.position() + length; - buffer.position(position); - - buffer.mark(); - length = buffer.getInt(); - } - - // Reset the buffer to the previous mark. - buffer.reset(); - } catch (BufferUnderflowException e) { - buffer.reset(); + return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize); + } catch (IOException e) { + throw new StorageException(e); } } @Override - public long getLastIndex() { - return lastEntry != null ? lastEntry.index() : segment.index() - 1; + @NonNull MappedByteBuffer buffer() { + return mappedBuffer; } @Override - public Indexed getLastEntry() { - return lastEntry; + MappedJournalSegmentWriter toMapped() { + return this; } @Override - public long getNextIndex() { - if (lastEntry != null) { - return lastEntry.index() + 1; - } else { - return firstIndex; - } + DiskJournalSegmentWriter toFileChannel() { + close(); + return new DiskJournalSegmentWriter<>(this); } @Override - public void append(Indexed entry) { - final long nextIndex = getNextIndex(); - - // If the entry's index is greater than the next index in the segment, skip some entries. - if (entry.index() > nextIndex) { - throw new IndexOutOfBoundsException("Entry index is not sequential"); - } - - // If the entry's index is less than the next index, truncate the segment. - if (entry.index() < nextIndex) { - truncate(entry.index() - 1); - } - append(entry.entry()); + JournalSegmentReader reader() { + return reader; } @Override @SuppressWarnings("unchecked") - public Indexed append(T entry) { + Indexed append(final T entry) { // Store the entry index. final long index = getNextIndex(); // Serialize the entry. - int position = buffer.position(); - if (position + Integer.BYTES + Integer.BYTES > buffer.limit()) { + final int bodyPosition = currentPosition + HEADER_BYTES; + final int avail = maxSegmentSize - bodyPosition; + if (avail < 0) { throw new BufferOverflowException(); } - buffer.position(position + Integer.BYTES + Integer.BYTES); - + final var entryBytes = buffer.slice(bodyPosition, Math.min(avail, maxEntrySize)); try { - namespace.serialize(entry, buffer); + namespace.serialize(entry, entryBytes); } catch (KryoException e) { - throw new BufferOverflowException(); - } - - final int length = buffer.position() - (position + Integer.BYTES + Integer.BYTES); + if (entryBytes.capacity() != maxEntrySize) { + // We have not provided enough capacity, signal to roll to next segment + throw new BufferOverflowException(); + } - // If the entry length exceeds the maximum entry size then throw an exception. - if (length > maxEntrySize) { // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum. - buffer.position(position); - throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); + throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); } + final int length = entryBytes.position(); + // Compute the checksum for the entry. - final CRC32 crc32 = new CRC32(); - buffer.position(position + Integer.BYTES + Integer.BYTES); - ByteBuffer slice = buffer.slice(); - slice.limit(length); - crc32.update(slice); - final long checksum = crc32.getValue(); + final var crc32 = new CRC32(); + crc32.update(entryBytes.flip()); // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - buffer.position(position); - buffer.putInt(length); - buffer.putInt((int) checksum); - buffer.position(position + Integer.BYTES + Integer.BYTES + length); + buffer.putInt(currentPosition, length).putInt(currentPosition + Integer.BYTES, (int) crc32.getValue()); // Update the last entry with the correct index/term/length. Indexed indexedEntry = new Indexed<>(index, entry, length); - this.lastEntry = indexedEntry; - this.index.index(index, position); - return (Indexed) indexedEntry; - } - - @Override - public void commit(long index) { + lastEntry = indexedEntry; + this.index.index(index, currentPosition); + currentPosition = currentPosition + HEADER_BYTES + length; + return (Indexed) indexedEntry; } @Override - public void truncate(long index) { + void truncate(final long index) { // If the index is greater than or equal to the last index, skip the truncate. if (index >= getLastIndex()) { return; @@ -232,28 +154,26 @@ class MappedJournalSegmentWriter implements JournalWriter { // Truncate the index. this.index.truncate(index); - if (index < segment.index()) { + if (index < firstIndex) { // Reset the writer to the first entry. - buffer.position(JournalSegmentDescriptor.BYTES); + currentPosition = JournalSegmentDescriptor.BYTES; } else { // Reset the writer to the given index. reset(index); } // Zero the entry header at current buffer position. - int position = buffer.position(); // Note: we issue a single putLong() instead of two putInt()s. - buffer.putLong(0); - buffer.position(position); + buffer.putLong(currentPosition, 0L); } @Override - public void flush() { + void flush() { mappedBuffer.force(); } @Override - public void close() { + void close() { flush(); try { BufferCleaner.freeBuffer(mappedBuffer);