X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FFileChannelJournalSegmentWriter.java;h=f4c7ec52c3c53f7d739eff7fa30da1945010f8d1;hb=3924193f0176e3f114a2dc82e45a49a684fe2375;hp=b43aa3d443fbe08eaccc0c851bd33418c832a855;hpb=b2106c7c4fbc91f6cdfd8bf4371b723db6fdae2f;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java index b43aa3d443..f4c7ec52c3 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java @@ -1,5 +1,6 @@ /* - * Copyright 2017-present Open Networking Foundation + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,9 +23,9 @@ import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.zip.CRC32; -import java.util.zip.Checksum; /** * Segment writer. @@ -41,15 +42,12 @@ import java.util.zip.Checksum; * * @author Jordan Halterman */ -class FileChannelJournalSegmentWriter implements JournalWriter { - private final FileChannel channel; - private final JournalSegment segment; - private final int maxEntrySize; - private final JournalIndex index; - private final JournalSerdes namespace; +final class FileChannelJournalSegmentWriter extends JournalSegmentWriter { + private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[ENTRY_HEADER_BYTES]); + private final ByteBuffer memory; - private final long firstIndex; private Indexed lastEntry; + private long currentPosition; FileChannelJournalSegmentWriter( FileChannel channel, @@ -57,39 +55,52 @@ class FileChannelJournalSegmentWriter implements JournalWriter { int maxEntrySize, JournalIndex index, JournalSerdes namespace) { - this.channel = channel; - this.segment = segment; - this.maxEntrySize = maxEntrySize; - this.index = index; - this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2); - memory.limit(0); - this.namespace = namespace; - this.firstIndex = segment.index(); + super(channel, segment, maxEntrySize, index, namespace); + memory = allocMemory(maxEntrySize); reset(0); } + FileChannelJournalSegmentWriter(JournalSegmentWriter previous, int position) { + super(previous); + memory = allocMemory(maxEntrySize); + lastEntry = previous.getLastEntry(); + currentPosition = position; + } + + private static ByteBuffer allocMemory(int maxEntrySize) { + final var buf = ByteBuffer.allocate((maxEntrySize + ENTRY_HEADER_BYTES) * 2); + buf.limit(0); + return buf; + } + + @Override + MappedByteBuffer buffer() { + return null; + } + + @Override + MappedJournalSegmentWriter toMapped() { + return new MappedJournalSegmentWriter<>(this, (int) currentPosition); + } + + @Override + FileChannelJournalSegmentWriter toFileChannel() { + return this; + } + @Override - public void reset(long index) { + void reset(long index) { long nextIndex = firstIndex; // Clear the buffer indexes. + currentPosition = JournalSegmentDescriptor.BYTES; + try { - channel.position(JournalSegmentDescriptor.BYTES); - memory.clear().flip(); - - // Record the current buffer position. - long position = channel.position(); - - // Read more bytes from the segment if necessary. - if (memory.remaining() < maxEntrySize) { - memory.clear(); - channel.read(memory); - channel.position(position); - memory.flip(); - } + // Clear memory buffer and read fist chunk + channel.read(memory.clear(), JournalSegmentDescriptor.BYTES); + memory.flip(); // Read the entry length. - memory.mark(); int length = memory.getInt(); // If the length is non-zero, read the entry. @@ -98,144 +109,99 @@ class FileChannelJournalSegmentWriter implements JournalWriter { // Read the checksum of the entry. final long checksum = memory.getInt() & 0xFFFFFFFFL; + // Slice off the entry's bytes + final ByteBuffer entryBytes = memory.slice(); + entryBytes.limit(length); + // Compute the checksum for the entry bytes. - final Checksum crc32 = new CRC32(); - crc32.update(memory.array(), memory.position(), length); - - // If the stored checksum equals the computed checksum, return the entry. - if (checksum == crc32.getValue()) { - int limit = memory.limit(); - memory.limit(memory.position() + length); - final E entry = namespace.deserialize(memory); - memory.limit(limit); - lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, (int) position); - nextIndex++; - } else { + final CRC32 crc32 = new CRC32(); + crc32.update(entryBytes); + + // If the stored checksum does not equal the computed checksum, do not proceed further + if (checksum != crc32.getValue()) { break; } + entryBytes.rewind(); + final E entry = namespace.deserialize(entryBytes); + lastEntry = new Indexed<>(nextIndex, entry, length); + this.index.index(nextIndex, (int) currentPosition); + nextIndex++; + // Update the current position for indexing. - position = channel.position() + memory.position(); + currentPosition = currentPosition + ENTRY_HEADER_BYTES + length; + memory.position(memory.position() + length); // Read more bytes from the segment if necessary. if (memory.remaining() < maxEntrySize) { - channel.position(position); - memory.clear(); - channel.read(memory); - channel.position(position); + channel.read(memory.compact()); memory.flip(); } - memory.mark(); length = memory.getInt(); } - - // Reset the buffer to the previous mark. - channel.position(channel.position() + memory.reset().position()); } catch (BufferUnderflowException e) { - try { - channel.position(channel.position() + memory.reset().position()); - } catch (IOException e2) { - throw new StorageException(e2); - } + // No-op, position is only updated on success } catch (IOException e) { throw new StorageException(e); } } @Override - public long getLastIndex() { - return lastEntry != null ? lastEntry.index() : segment.index() - 1; - } - - @Override - public Indexed getLastEntry() { + Indexed getLastEntry() { return lastEntry; } - @Override - public long getNextIndex() { - if (lastEntry != null) { - return lastEntry.index() + 1; - } else { - return firstIndex; - } - } - - @Override - public void append(Indexed entry) { - final long nextIndex = getNextIndex(); - - // If the entry's index is greater than the next index in the segment, skip some entries. - if (entry.index() > nextIndex) { - throw new IndexOutOfBoundsException("Entry index is not sequential"); - } - - // If the entry's index is less than the next index, truncate the segment. - if (entry.index() < nextIndex) { - truncate(entry.index() - 1); - } - append(entry.entry()); - } - @Override @SuppressWarnings("unchecked") - public Indexed append(T entry) { + Indexed append(T entry) { // Store the entry index. final long index = getNextIndex(); + // Serialize the entry. try { - // Serialize the entry. - memory.clear(); - memory.position(Integer.BYTES + Integer.BYTES); - try { - namespace.serialize(entry, memory); - } catch (KryoException e) { - throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } - memory.flip(); + namespace.serialize(entry, memory.clear().position(ENTRY_HEADER_BYTES)); + } catch (KryoException e) { + throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + memory.flip(); - final int length = memory.limit() - (Integer.BYTES + Integer.BYTES); + final int length = memory.limit() - ENTRY_HEADER_BYTES; - // Ensure there's enough space left in the buffer to store the entry. - long position = channel.position(); - if (segment.descriptor().maxSegmentSize() - position < length + Integer.BYTES + Integer.BYTES) { - throw new BufferOverflowException(); - } + // Ensure there's enough space left in the buffer to store the entry. + if (maxSegmentSize - currentPosition < length + ENTRY_HEADER_BYTES) { + throw new BufferOverflowException(); + } - // If the entry length exceeds the maximum entry size then throw an exception. - if (length > maxEntrySize) { - throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } + // If the entry length exceeds the maximum entry size then throw an exception. + if (length > maxEntrySize) { + throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + + // Compute the checksum for the entry. + final CRC32 crc32 = new CRC32(); + crc32.update(memory.array(), ENTRY_HEADER_BYTES, memory.limit() - ENTRY_HEADER_BYTES); + final long checksum = crc32.getValue(); - // Compute the checksum for the entry. - final Checksum crc32 = new CRC32(); - crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES)); - final long checksum = crc32.getValue(); - - // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - memory.putInt(0, length); - memory.putInt(Integer.BYTES, (int) checksum); - channel.write(memory); - - // Update the last entry with the correct index/term/length. - Indexed indexedEntry = new Indexed<>(index, entry, length); - this.lastEntry = indexedEntry; - this.index.index(index, (int) position); - return (Indexed) indexedEntry; + // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. + memory.putInt(0, length).putInt(Integer.BYTES, (int) checksum); + try { + channel.write(memory, currentPosition); } catch (IOException e) { throw new StorageException(e); } - } - @Override - public void commit(long index) { + // Update the last entry with the correct index/term/length. + Indexed indexedEntry = new Indexed<>(index, entry, length); + this.lastEntry = indexedEntry; + this.index.index(index, (int) currentPosition); + currentPosition = currentPosition + ENTRY_HEADER_BYTES + length; + return (Indexed) indexedEntry; } @Override - public void truncate(long index) { + void truncate(long index) { // If the index is greater than or equal to the last index, skip the truncate. if (index >= getLastIndex()) { return; @@ -248,30 +214,23 @@ class FileChannelJournalSegmentWriter implements JournalWriter { this.index.truncate(index); try { - if (index < segment.index()) { + if (index < firstIndex) { // Reset the writer to the first entry. - channel.position(JournalSegmentDescriptor.BYTES); + currentPosition = JournalSegmentDescriptor.BYTES; } else { // Reset the writer to the given index. reset(index); } - // Zero entries at the current position. - // FIXME: This is quite inefficient, we essentially want to zero-out part of the file, but it is not quite clear - // how much: this overwrites at least two entries. I believe we should be able to get by with wiping the - // header of the current entry. - memory.clear(); - for (int i = 0; i < memory.limit(); i++) { - memory.put(i, (byte) 0); - } - channel.write(memory, channel.position()); + // Zero the entry header at current channel position. + channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), currentPosition); } catch (IOException e) { throw new StorageException(e); } } @Override - public void flush() { + void flush() { try { if (channel.isOpen()) { channel.force(true); @@ -282,7 +241,7 @@ class FileChannelJournalSegmentWriter implements JournalWriter { } @Override - public void close() { + void close() { flush(); } }