From 2e8a82c1b46bedf40e5e3d51f03efae75d72ca48 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 11 Mar 2024 11:13:05 +0100 Subject: [PATCH] Eliminate MappableJournalSegmentReader MappableJournalSegmentReader is an indirection used by JournalSegmentReader to get a stable reader and the backend is manipulated via map()/unmap() operations. This is sort of a band-aid to lifecycle, as JournalSegment.close() eagerly closes the writer, which in turn invalidates buffer and all that jazz. This dance is quite unnecessary, as JournalSegmentReader always acquires the Segment, hence as long as it has a reader, the segment is guaranteed to be mapped -- except for the case of close(). This patch reworks the logic in couple of ways: - references are now guarding the writer itself, so as long as there are some, the writer is not closed. - createReader() now increments the reference count, which means we can directly allocate the appopriate JournalSegmentReader implementation - JournalSegmentReader.close() now routes to JournalSegment.closeReader(), which in turn releases the reference JIRA: CONTROLLER-2098 Change-Id: I29b926dec5dc1a77e7e5e51ee3e2deb4f0ca6e33 Signed-off-by: Robert Varga --- .../storage/journal/JournalSegment.java | 71 +++++------ .../storage/journal/JournalSegmentReader.java | 4 +- .../journal/MappableJournalSegmentReader.java | 118 ------------------ .../journal/SegmentedJournalReader.java | 9 +- 4 files changed, 37 insertions(+), 165 deletions(-) delete mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index f67b4cc1b7..f5e1b83bbf 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -18,7 +18,6 @@ package io.atomix.storage.journal; import io.atomix.storage.journal.index.JournalIndex; import io.atomix.storage.journal.index.SparseJournalIndex; import java.io.IOException; -import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.StandardOpenOption; @@ -42,7 +41,7 @@ final class JournalSegment implements AutoCloseable { private final JournalIndex index; private final JournalSerdes namespace; private final MappableJournalSegmentWriter writer; - private final Set> readers = ConcurrentHashMap.newKeySet(); + private final Set> readers = ConcurrentHashMap.newKeySet(); private final AtomicInteger references = new AtomicInteger(); private final FileChannel channel; private boolean open = true; @@ -158,8 +157,8 @@ final class JournalSegment implements AutoCloseable { * Acquires a reference to the log segment. */ void acquire() { - if (references.getAndIncrement() == 0 && open) { - map(); + if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) { + writer.map(); } } @@ -167,28 +166,13 @@ final class JournalSegment implements AutoCloseable { * Releases a reference to the log segment. */ void release() { - if (references.decrementAndGet() == 0 && open) { - unmap(); - } - } - - /** - * Maps the log segment into memory. - */ - private void map() { - if (storageLevel == StorageLevel.MAPPED) { - MappedByteBuffer buffer = writer.map(); - readers.forEach(reader -> reader.map(buffer)); - } - } - - /** - * Unmaps the log segment from memory. - */ - private void unmap() { - if (storageLevel == StorageLevel.MAPPED) { - writer.unmap(); - readers.forEach(reader -> reader.unmap()); + if (references.decrementAndGet() == 0) { + if (storageLevel == StorageLevel.MAPPED) { + writer.unmap(); + } + if (!open) { + finishClose(); + } } } @@ -207,14 +191,14 @@ final class JournalSegment implements AutoCloseable { * * @return A new segment reader. */ - MappableJournalSegmentReader createReader() { + JournalSegmentReader createReader() { checkOpen(); - MappableJournalSegmentReader reader = new MappableJournalSegmentReader<>(channel, this, maxEntrySize, index, - namespace); - MappedByteBuffer buffer = writer.buffer(); - if (buffer != null) { - reader.map(buffer); - } + acquire(); + + final var buffer = writer.buffer(); + final var reader = buffer == null + ? new FileChannelJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace) + : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace); readers.add(reader); return reader; } @@ -224,8 +208,10 @@ final class JournalSegment implements AutoCloseable { * * @param reader the closed segment reader */ - void closeReader(MappableJournalSegmentReader reader) { - readers.remove(reader); + void closeReader(JournalSegmentReader reader) { + if (readers.remove(reader)) { + release(); + } } /** @@ -249,10 +235,19 @@ final class JournalSegment implements AutoCloseable { */ @Override public void close() { - unmap(); - writer.close(); - readers.forEach(reader -> reader.close()); + if (!open) { + return; + } + open = false; + readers.forEach(JournalSegmentReader::close); + if (references.get() == 0) { + finishClose(); + } + } + + private void finishClose() { + writer.close(); try { channel.close(); } catch (IOException e) { diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java index 512e36a1c5..2dddd98276 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -20,12 +20,14 @@ abstract sealed class JournalSegmentReader implements JournalReader private final JournalIndex index; final JournalSerdes namespace; private final long firstIndex; + private final JournalSegment segment; private Indexed currentEntry; private Indexed nextEntry; JournalSegmentReader(final JournalSegment segment, final int maxEntrySize, final JournalIndex index, final JournalSerdes namespace) { + this.segment = requireNonNull(segment); this.maxEntrySize = maxEntrySize; this.index = requireNonNull(index); this.namespace = requireNonNull(namespace); @@ -100,7 +102,7 @@ abstract sealed class JournalSegmentReader implements JournalReader @Override public final void close() { - // FIXME: CONTROLLER-2098: remove this method + segment.closeReader(this); } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java deleted file mode 100644 index c70b36bb35..0000000000 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2018-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.atomix.storage.journal; - -import io.atomix.storage.journal.index.JournalIndex; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; - -/** - * Mappable log segment reader. - */ -final class MappableJournalSegmentReader implements JournalReader { - private final JournalSegment segment; - private final FileChannel channel; - private final int maxEntrySize; - private final JournalIndex index; - private final JournalSerdes namespace; - private JournalSegmentReader reader; - - MappableJournalSegmentReader( - FileChannel channel, - JournalSegment segment, - int maxEntrySize, - JournalIndex index, - JournalSerdes namespace) { - this.channel = channel; - this.segment = segment; - this.maxEntrySize = maxEntrySize; - this.index = index; - this.namespace = namespace; - this.reader = new FileChannelJournalSegmentReader<>(channel, segment, maxEntrySize, index, namespace); - } - - /** - * Converts the reader to a mapped reader using the given buffer. - * - * @param buffer the mapped buffer - */ - void map(ByteBuffer buffer) { - if (!(reader instanceof MappedJournalSegmentReader)) { - JournalReader reader = this.reader; - this.reader = new MappedJournalSegmentReader<>(buffer, segment, maxEntrySize, index, namespace); - this.reader.reset(reader.getNextIndex()); - reader.close(); - } - } - - /** - * Converts the reader to an unmapped reader. - */ - void unmap() { - if (reader instanceof MappedJournalSegmentReader) { - JournalReader reader = this.reader; - this.reader = new FileChannelJournalSegmentReader<>(channel, segment, maxEntrySize, index, namespace); - this.reader.reset(reader.getNextIndex()); - reader.close(); - } - } - - @Override - public long getFirstIndex() { - return reader.getFirstIndex(); - } - - @Override - public long getCurrentIndex() { - return reader.getCurrentIndex(); - } - - @Override - public Indexed getCurrentEntry() { - return reader.getCurrentEntry(); - } - - @Override - public long getNextIndex() { - return reader.getNextIndex(); - } - - @Override - public boolean hasNext() { - return reader.hasNext(); - } - - @Override - public Indexed next() { - return reader.next(); - } - - @Override - public void reset() { - reader.reset(); - } - - @Override - public void reset(long index) { - reader.reset(index); - } - - @Override - public void close() { - reader.close(); - segment.closeReader(this); - } -} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java index d5201b3c26..f9e015f686 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java @@ -24,14 +24,13 @@ public final class SegmentedJournalReader implements JournalReader { private final SegmentedJournal journal; private JournalSegment currentSegment; private Indexed previousEntry; - private MappableJournalSegmentReader currentReader; + private JournalSegmentReader currentReader; private final Mode mode; SegmentedJournalReader(SegmentedJournal journal, long index, Mode mode) { this.journal = journal; this.mode = mode; currentSegment = journal.getSegment(index); - currentSegment.acquire(); currentReader = currentSegment.createReader(); long nextIndex = getNextIndex(); @@ -76,10 +75,8 @@ public final class SegmentedJournalReader implements JournalReader { public void reset() { previousEntry = null; currentReader.close(); - currentSegment.release(); currentSegment = journal.getFirstSegment(); - currentSegment.acquire(); currentReader = currentSegment.createReader(); } @@ -107,10 +104,8 @@ public final class SegmentedJournalReader implements JournalReader { JournalSegment segment = journal.getSegment(index - 1); if (segment != null) { currentReader.close(); - currentSegment.release(); currentSegment = segment; - currentSegment.acquire(); currentReader = currentSegment.createReader(); } } @@ -172,10 +167,8 @@ public final class SegmentedJournalReader implements JournalReader { previousEntry = currentReader.getCurrentEntry(); currentReader.close(); - currentSegment.release(); currentSegment = nextSegment; - currentSegment.acquire(); currentReader = currentSegment.createReader(); return true; } -- 2.36.6