From 7997055cc3a82e0bfe753a4e2dbcd8af59d9113d Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 May 2024 15:11:23 +0200 Subject: [PATCH] Refactor SegmentedJournalWriter.reset() We have two methods reset() and truncate(), both of which take an index, without a real documented distinction. Document reset() as taking the index to read/write next and instantiate a guard against attempts to use reset(0) -- as 0 is not a valid next index. JIRA: CONTROLLER-2100 Change-Id: I8a6b366fdb0827ab3cd5a494e7e9f5a741983264 Signed-off-by: Robert Varga --- .../atomix/storage/journal/ByteBufReader.java | 2 +- .../atomix/storage/journal/ByteBufWriter.java | 5 +-- .../atomix/storage/journal/JournalReader.java | 2 +- .../atomix/storage/journal/JournalWriter.java | 4 ++- .../journal/SegmentedByteBufWriter.java | 33 ++++++++++++------- .../journal/SegmentedJournalWriter.java | 10 +++--- 6 files changed, 34 insertions(+), 22 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java index 1ebe81eef6..205857533d 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java @@ -71,7 +71,7 @@ public interface ByteBufReader extends AutoCloseable { /** * Resets the reader to the given index. * - * @param index The index to which to reset the reader + * @param index the next index to read */ void reset(long index); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java index 7211a8844d..2f85cc6163 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java @@ -56,9 +56,9 @@ public interface ByteBufWriter { /** * Resets the head of the journal to the given index. * - * @param index The index to which to reset the head of the journal + * @param index the next index to write + * @throws IndexOutOfBoundsException if the journal cannot be reset to specified index */ - // FIXME: reconcile with reader's reset and truncate() // FIXME: throws IOException void reset(long index); @@ -66,6 +66,7 @@ public interface ByteBufWriter { * Truncates the log to the given index. * * @param index The index to which to truncate the log. + * @throws IndexOutOfBoundsException if the journal cannot be reset to specified index */ // FIXME: reconcile with reset() // FIXME: throws IOException diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java index 635f6248c4..8f49aa2ebc 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java @@ -88,7 +88,7 @@ public interface JournalReader extends AutoCloseable { /** * Resets the reader to the given index. * - * @param index The index to which to reset the reader. + * @param index the next index to read */ void reset(long index); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java index ba7c5821aa..ae65778185 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java @@ -55,7 +55,8 @@ public interface JournalWriter { /** * Resets the head of the journal to the given index. * - * @param index the index to which to reset the head of the journal + * @param index the next index to write + * @throws IndexOutOfBoundsException if the journal cannot be reset to specified index */ // FIXME: reconcile with reader's reset and truncate() void reset(long index); @@ -64,6 +65,7 @@ public interface JournalWriter { * Truncates the log to the given index. * * @param index The index to which to truncate the log. + * @throws IndexOutOfBoundsException if the journal cannot be reset to specified index */ // FIXME: reconcile with reset() void truncate(long index); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java index 0d942dd085..42c00e59fd 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -46,18 +46,6 @@ final class SegmentedByteBufWriter implements ByteBufWriter { return currentWriter.getNextIndex(); } - @Override - public void reset(final long index) { - if (index > currentSegment.firstIndex()) { - currentSegment.releaseWriter(); - currentSegment = journal.resetSegments(index); - currentWriter = currentSegment.acquireWriter(); - } else { - truncate(index - 1); - } - journal.resetHead(index); - } - @Override public void commit(final long index) { if (index > journal.getCommitIndex()) { @@ -82,12 +70,33 @@ final class SegmentedByteBufWriter implements ByteBufWriter { return verifyNotNull(currentWriter.append(buf)); } + @Override + public void reset(final long index) { + final long commitIndex = journal.getCommitIndex(); + if (index <= commitIndex) { + // also catches index == 0, which is not a valid next index + throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", committed index: " + commitIndex); + } + + if (index > currentSegment.firstIndex()) { + currentSegment.releaseWriter(); + currentSegment = journal.resetSegments(index); + currentWriter = currentSegment.acquireWriter(); + } else { + checkedTruncate(index - 1); + } + journal.resetHead(index); + } + @Override public void truncate(final long index) { if (index < journal.getCommitIndex()) { throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index); } + checkedTruncate(index); + } + private void checkedTruncate(final long index) { // Delete all segments with first indexes greater than the given index. while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) { currentSegment.releaseWriter(); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java index 7c331ccb24..80c352ead0 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java @@ -40,11 +40,6 @@ final class SegmentedJournalWriter implements JournalWriter { return writer.nextIndex(); } - @Override - public void reset(final long index) { - writer.reset(index); - } - @Override public void commit(final long index) { writer.commit(index); @@ -56,6 +51,11 @@ final class SegmentedJournalWriter implements JournalWriter { return new Indexed<>(writer.append(buf), entry, buf.readableBytes()); } + @Override + public void reset(final long index) { + writer.reset(index); + } + @Override public void truncate(final long index) { writer.truncate(index); -- 2.36.6