X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FJournalSegmentWriter.java;fp=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FJournalSegmentWriter.java;h=f00e0daddd542aa42d0db489b075f85d514dc198;hb=a35bb2b0d7c688eaa83c38996373b1784e513694;hp=c2e0a258c9c4a650b62c73b548a5dc49f61e938b;hpb=285074c11981fdc96ba8922563ea5e74036e2afd;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index c2e0a258c9..f00e0daddd 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -20,19 +20,17 @@ import static java.util.Objects.requireNonNull; import io.atomix.storage.journal.index.JournalIndex; import io.netty.buffer.ByteBuf; -import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; import java.util.zip.CRC32; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, MappedJournalSegmentWriter { +final class JournalSegmentWriter { private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class); - final @NonNull FileChannel channel; + private final FileWriter fileWriter; final @NonNull JournalSegment segment; private final @NonNull JournalIndex index; final int maxSegmentSize; @@ -42,17 +40,18 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map private Long lastIndex; private ByteBuf lastWritten; - JournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, + JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize, final JournalIndex index) { - this.channel = requireNonNull(channel); + this.fileWriter = requireNonNull(fileWriter); this.segment = requireNonNull(segment); this.index = requireNonNull(index); maxSegmentSize = segment.descriptor().maxSegmentSize(); this.maxEntrySize = maxEntrySize; + // adjust lastEntry value + reset(0); } - JournalSegmentWriter(final JournalSegmentWriter previous) { - channel = previous.channel; + JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) { segment = previous.segment; index = previous.index; maxSegmentSize = previous.maxSegmentSize; @@ -60,6 +59,7 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map lastWritten = previous.lastWritten; lastIndex = previous.lastIndex; currentPosition = previous.currentPosition; + this.fileWriter = requireNonNull(fileWriter); } /** @@ -114,7 +114,7 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map } // allocate buffer and write data - final var writeBuffer = startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES); + final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES); writeBuffer.put(buf.nioBuffer()); // Compute the checksum for the entry. @@ -123,7 +123,7 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue()); - commitWrite(position, writeBuffer.rewind()); + fileWriter.commitWrite(position, writeBuffer.rewind()); // Update the last entry with the correct index/term/length. currentPosition = nextPosition; @@ -134,10 +134,6 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map return index; } - abstract ByteBuffer startWrite(int position, int size); - - abstract void commitWrite(int position, ByteBuffer entry); - /** * Resets the head of the segment to the given index. * @@ -145,23 +141,21 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map */ final void reset(final long index) { // acquire ownership of cache and make sure reader does not see anything we've done once we're done - final var reader = reader(); - reader.invalidateCache(); + final var fileReader = fileWriter.reader(); try { - resetWithBuffer(reader, index); + resetWithBuffer(fileReader, index); } finally { // Make sure reader does not see anything we've done - reader.invalidateCache(); + fileReader.invalidateCache(); } } - abstract JournalSegmentReader reader(); - - private void resetWithBuffer(final JournalSegmentReader reader, final long index) { + private void resetWithBuffer(final FileReader fileReader, final long index) { long nextIndex = segment.firstIndex(); // Clear the buffer indexes and acquire ownership of the buffer currentPosition = JournalSegmentDescriptor.BYTES; + final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize); reader.setPosition(JournalSegmentDescriptor.BYTES); while (index == 0 || nextIndex <= index) { @@ -207,25 +201,22 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map } // Zero the entry header at current channel position. - writeEmptyHeader(currentPosition); + fileWriter.writeEmptyHeader(currentPosition); } - /** - * Write {@link SegmentEntry#HEADER_BYTES} worth of zeroes at specified position. - * - * @param position position to write to - */ - abstract void writeEmptyHeader(int position); - /** * Flushes written entries to disk. */ - abstract void flush(); + void flush() { + fileWriter.flush(); + } /** * Closes this writer. */ - abstract void close(); + void close() { + fileWriter.close(); + } /** * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a @@ -233,9 +224,17 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map * * @return the mapped buffer underlying the segment writer, or {@code null}. */ - abstract @Nullable MappedByteBuffer buffer(); + @Nullable MappedByteBuffer buffer() { + return fileWriter.buffer(); + } - abstract @NonNull MappedJournalSegmentWriter toMapped(); + @NonNull JournalSegmentWriter toMapped() { + final var newWriter = fileWriter.toMapped(); + return newWriter == null ? this : new JournalSegmentWriter(this, newWriter); + } - abstract @NonNull DiskJournalSegmentWriter toFileChannel(); + @NonNull JournalSegmentWriter toFileChannel() { + final var newWriter = fileWriter.toDisk(); + return newWriter == null ? this : new JournalSegmentWriter(this, newWriter); + } }