Utilize segment index to recover writer state after truncate
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentReader.java
index 512e36a1c57e5c9fe56aacc6fe0997f63e2fcbb6..aa4c0da18a9e59ea5b8cef851c992a63256b6143 100644 (file)
 /*
  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
  *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package io.atomix.storage.journal;
 
+import static com.google.common.base.Verify.verify;
 import static java.util.Objects.requireNonNull;
 
-import io.atomix.storage.journal.index.JournalIndex;
-import io.atomix.storage.journal.index.Position;
-import java.util.NoSuchElementException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
-        permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
-    final int maxEntrySize;
-    private final JournalIndex index;
-    final JournalSerdes namespace;
-    private final long firstIndex;
-
-    private Indexed<E> currentEntry;
-    private Indexed<E> nextEntry;
+final class JournalSegmentReader {
+    private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
 
-    JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
-            final JournalSerdes namespace) {
-        this.maxEntrySize = maxEntrySize;
-        this.index = requireNonNull(index);
-        this.namespace = requireNonNull(namespace);
-        firstIndex = segment.index();
-    }
+    private final JournalSegment segment;
+    private final FileReader fileReader;
+    private final int maxSegmentSize;
+    private final int maxEntrySize;
 
-    @Override
-    public final long getFirstIndex() {
-        return firstIndex;
-    }
+    private int position;
 
-    @Override
-    public final long getCurrentIndex() {
-        return currentEntry != null ? currentEntry.index() : 0;
+    JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) {
+        this.segment = requireNonNull(segment);
+        this.fileReader = requireNonNull(fileReader);
+        maxSegmentSize = segment.file().maxSize();
+        this.maxEntrySize = maxEntrySize;
     }
 
-    @Override
-    public final Indexed<E> getCurrentEntry() {
-        return currentEntry;
+    /**
+     * Return the current position.
+     *
+     * @return current position.
+     */
+    int position() {
+        return position;
     }
 
-    @Override
-    public final long getNextIndex() {
-        return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
+    /**
+     * Set the file position.
+     *
+     * @param position new position
+     */
+    void setPosition(final int position) {
+        verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
+            "Invalid position %s", position);
+        this.position = position;
+        fileReader.invalidateCache();
     }
 
-    @Override
-    public final boolean hasNext() {
-        return nextEntry != null || (nextEntry = readNext()) != null;
+    /**
+     * Invalidate any cache that is present, so that the next read is coherent with the backing file.
+     */
+    void invalidateCache() {
+        fileReader.invalidateCache();
     }
 
-    @Override
-    public final Indexed<E> next() {
-        if (!hasNext()) {
-            throw new NoSuchElementException();
+    /**
+     * Reads the next binary data block
+     *
+     * @return The binary data, or {@code null}
+     */
+    @Nullable ByteBuf readBytes() {
+        // Check if there is enough in the buffer remaining
+        final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
+        if (remaining < 0) {
+            // Not enough space in the segment, there can never be another entry
+            return null;
         }
 
-        // Set the current entry to the next entry.
-        currentEntry = nextEntry;
-
-        // Reset the next entry to null.
-        nextEntry = null;
-
-        // Read the next entry in the segment.
-        nextEntry = readNext();
-
-        // Return the current entry.
-        return currentEntry;
-    }
+        // Calculate maximum entry length not exceeding file size nor maxEntrySize
+        final var maxLength = Math.min(remaining, maxEntrySize);
+        final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES);
 
-    @Override
-    public final void reset() {
-        currentEntry = null;
-        nextEntry = null;
-        setPosition(JournalSegmentDescriptor.BYTES);
-        nextEntry = readNext();
-    }
-
-    @Override
-    public final void reset(final long index) {
-        reset();
-        Position position = this.index.lookup(index - 1);
-        if (position != null) {
-            currentEntry = new Indexed<>(position.index() - 1, null, 0);
-            setPosition(position.position());
-            nextEntry = readNext();
+        // Read the entry length
+        final var length = buffer.getInt(0);
+        if (length < 1 || length > maxLength) {
+            // Invalid length, make sure next read re-tries
+            invalidateCache();
+            return null;
         }
-        while (getNextIndex() < index && hasNext()) {
-            next();
+
+        // Read the entry checksum
+        final int checksum = buffer.getInt(Integer.BYTES);
+
+        // Slice off the entry's bytes
+        final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length);
+        // If the stored checksum does not equal the computed checksum, do not proceed further
+        final var computed = SegmentEntry.computeChecksum(entryBuffer);
+        if (checksum != computed) {
+            LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
+            invalidateCache();
+            return null;
         }
-    }
 
-    @Override
-    public final void close() {
-        // FIXME: CONTROLLER-2098: remove this method
-    }
+        // update position
+        position += SegmentEntry.HEADER_BYTES + length;
 
-    /**
-     * Set the file position.
-     *
-     * @param position new position
-     */
-    abstract void setPosition(int position);
+        // rewind and return
+        return Unpooled.wrappedBuffer(entryBuffer.rewind());
+    }
 
     /**
-     * Reads the next entry in the segment.
-     *
-     * @return Next entry, or {@code null}
+     * Close this reader.
      */
-    abstract @Nullable Indexed<E> readNext();
+    void close() {
+        segment.closeReader(this);
+    }
 }