2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import io.atomix.storage.journal.index.JournalIndex;
19 import io.atomix.storage.journal.index.Position;
20 import java.nio.BufferUnderflowException;
21 import java.nio.ByteBuffer;
22 import java.util.NoSuchElementException;
23 import java.util.zip.CRC32;
28 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
30 final class MappedJournalSegmentReader<E> implements JournalReader<E> {
31 private final ByteBuffer buffer;
32 private final int maxEntrySize;
33 private final JournalIndex index;
34 private final JournalSerdes namespace;
35 private final long firstIndex;
36 private Indexed<E> currentEntry;
37 private Indexed<E> nextEntry;
39 MappedJournalSegmentReader(
41 JournalSegment<E> segment,
44 JournalSerdes namespace) {
45 this.buffer = buffer.slice();
46 this.maxEntrySize = maxEntrySize;
48 this.namespace = namespace;
49 this.firstIndex = segment.index();
54 public long getFirstIndex() {
59 public long getCurrentIndex() {
60 return currentEntry != null ? currentEntry.index() : 0;
64 public Indexed<E> getCurrentEntry() {
69 public long getNextIndex() {
70 return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
74 public void reset(long index) {
76 Position position = this.index.lookup(index - 1);
77 if (position != null) {
78 currentEntry = new Indexed<>(position.index() - 1, null, 0);
79 buffer.position(position.position());
82 while (getNextIndex() < index && hasNext()) {
89 buffer.position(JournalSegmentDescriptor.BYTES);
96 public boolean hasNext() {
97 // If the next entry is null, check whether a next entry exists.
98 if (nextEntry == null) {
101 return nextEntry != null;
105 public Indexed<E> next() {
107 throw new NoSuchElementException();
110 // Set the current entry to the next entry.
111 currentEntry = nextEntry;
113 // Reset the next entry to null.
116 // Read the next entry in the segment.
119 // Return the current entry.
124 * Reads the next entry in the segment.
126 private void readNext() {
127 // Compute the index of the next entry in the segment.
128 final long index = getNextIndex();
130 // Mark the buffer so it can be reset if necessary.
134 // Read the length of the entry.
135 final int length = buffer.getInt();
137 // If the buffer length is zero then return.
138 if (length <= 0 || length > maxEntrySize) {
144 // Read the checksum of the entry.
145 long checksum = buffer.getInt() & 0xFFFFFFFFL;
147 // Compute the checksum for the entry bytes.
148 final CRC32 crc32 = new CRC32();
149 ByteBuffer slice = buffer.slice();
153 // If the stored checksum equals the computed checksum, return the entry.
154 if (checksum == crc32.getValue()) {
156 E entry = namespace.deserialize(slice);
157 nextEntry = new Indexed<>(index, entry, length);
158 buffer.position(buffer.position() + length);
163 } catch (BufferUnderflowException e) {
170 public void close() {
171 // Do nothing. The writer is responsible for cleaning the mapped buffer.