/*
* Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
*
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package io.atomix.storage.journal;
+import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
-import io.atomix.storage.journal.index.JournalIndex;
-import io.atomix.storage.journal.index.Position;
-import java.util.NoSuchElementException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
- permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
- final int maxEntrySize;
- private final JournalIndex index;
- final JournalSerdes namespace;
- private final long firstIndex;
-
- private Indexed<E> currentEntry;
- private Indexed<E> nextEntry;
+final class JournalSegmentReader {
+ private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
- JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
- final JournalSerdes namespace) {
- this.maxEntrySize = maxEntrySize;
- this.index = requireNonNull(index);
- this.namespace = requireNonNull(namespace);
- firstIndex = segment.index();
- }
+ private final JournalSegment segment;
+ private final FileReader fileReader;
+ private final int maxSegmentSize;
+ private final int maxEntrySize;
- @Override
- public final long getFirstIndex() {
- return firstIndex;
- }
+ private int position;
- @Override
- public final long getCurrentIndex() {
- return currentEntry != null ? currentEntry.index() : 0;
+ JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) {
+ this.segment = requireNonNull(segment);
+ this.fileReader = requireNonNull(fileReader);
+ maxSegmentSize = segment.file().maxSize();
+ this.maxEntrySize = maxEntrySize;
}
- @Override
- public final Indexed<E> getCurrentEntry() {
- return currentEntry;
+ /**
+ * Return the current position.
+ *
+ * @return current position.
+ */
+ int position() {
+ return position;
}
- @Override
- public final long getNextIndex() {
- return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
+ /**
+ * Set the file position.
+ *
+ * @param position new position
+ */
+ void setPosition(final int position) {
+ verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize,
+ "Invalid position %s", position);
+ this.position = position;
+ fileReader.invalidateCache();
}
- @Override
- public final boolean hasNext() {
- return nextEntry != null || (nextEntry = readNext()) != null;
+ /**
+ * Invalidate any cache that is present, so that the next read is coherent with the backing file.
+ */
+ void invalidateCache() {
+ fileReader.invalidateCache();
}
- @Override
- public final Indexed<E> next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ /**
+ * Reads the next binary data block
+ *
+ * @return The binary data, or {@code null}
+ */
+ @Nullable ByteBuf readBytes() {
+ // Check if there is enough in the buffer remaining
+ final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
+ if (remaining < 0) {
+ // Not enough space in the segment, there can never be another entry
+ return null;
}
- // Set the current entry to the next entry.
- currentEntry = nextEntry;
-
- // Reset the next entry to null.
- nextEntry = null;
-
- // Read the next entry in the segment.
- nextEntry = readNext();
-
- // Return the current entry.
- return currentEntry;
- }
+ // Calculate maximum entry length not exceeding file size nor maxEntrySize
+ final var maxLength = Math.min(remaining, maxEntrySize);
+ final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES);
- @Override
- public final void reset() {
- currentEntry = null;
- nextEntry = null;
- setPosition(JournalSegmentDescriptor.BYTES);
- nextEntry = readNext();
- }
-
- @Override
- public final void reset(final long index) {
- reset();
- Position position = this.index.lookup(index - 1);
- if (position != null) {
- currentEntry = new Indexed<>(position.index() - 1, null, 0);
- setPosition(position.position());
- nextEntry = readNext();
+ // Read the entry length
+ final var length = buffer.getInt(0);
+ if (length < 1 || length > maxLength) {
+ // Invalid length, make sure next read re-tries
+ invalidateCache();
+ return null;
}
- while (getNextIndex() < index && hasNext()) {
- next();
+
+ // Read the entry checksum
+ final int checksum = buffer.getInt(Integer.BYTES);
+
+ // Slice off the entry's bytes
+ final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length);
+ // If the stored checksum does not equal the computed checksum, do not proceed further
+ final var computed = SegmentEntry.computeChecksum(entryBuffer);
+ if (checksum != computed) {
+ LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
+ invalidateCache();
+ return null;
}
- }
- @Override
- public final void close() {
- // FIXME: CONTROLLER-2098: remove this method
- }
+ // update position
+ position += SegmentEntry.HEADER_BYTES + length;
- /**
- * Set the file position.
- *
- * @param position new position
- */
- abstract void setPosition(int position);
+ // rewind and return
+ return Unpooled.wrappedBuffer(entryBuffer.rewind());
+ }
/**
- * Reads the next entry in the segment.
- *
- * @return Next entry, or {@code null}
+ * Close this reader.
*/
- abstract @Nullable Indexed<E> readNext();
+ void close() {
+ segment.closeReader(this);
+ }
}