2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import io.atomix.storage.StorageException;
19 import io.atomix.storage.journal.index.JournalIndex;
20 import io.atomix.storage.journal.index.Position;
21 import io.atomix.utils.serializer.Namespace;
23 import java.io.IOException;
24 import java.nio.BufferUnderflowException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
27 import java.util.NoSuchElementException;
28 import java.util.zip.CRC32;
29 import java.util.zip.Checksum;
34 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
36 class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
37 private final FileChannel channel;
38 private final int maxEntrySize;
39 private final JournalIndex index;
40 private final Namespace namespace;
41 private final ByteBuffer memory;
42 private final long firstIndex;
43 private Indexed<E> currentEntry;
44 private Indexed<E> nextEntry;
46 FileChannelJournalSegmentReader(
48 JournalSegment<E> segment,
51 Namespace namespace) {
52 this.channel = channel;
53 this.maxEntrySize = maxEntrySize;
55 this.namespace = namespace;
56 this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
57 this.firstIndex = segment.index();
62 public long getFirstIndex() {
67 public long getCurrentIndex() {
68 return currentEntry != null ? currentEntry.index() : 0;
72 public Indexed<E> getCurrentEntry() {
77 public long getNextIndex() {
78 return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
82 public void reset(long index) {
84 Position position = this.index.lookup(index - 1);
85 if (position != null) {
86 currentEntry = new Indexed<>(position.index() - 1, null, 0);
88 channel.position(position.position());
89 memory.clear().flip();
90 } catch (IOException e) {
91 throw new StorageException(e);
95 while (getNextIndex() < index && hasNext()) {
101 public void reset() {
103 channel.position(JournalSegmentDescriptor.BYTES);
104 } catch (IOException e) {
105 throw new StorageException(e);
107 memory.clear().limit(0);
114 public boolean hasNext() {
115 // If the next entry is null, check whether a next entry exists.
116 if (nextEntry == null) {
119 return nextEntry != null;
123 public Indexed<E> next() {
125 throw new NoSuchElementException();
128 // Set the current entry to the next entry.
129 currentEntry = nextEntry;
131 // Reset the next entry to null.
134 // Read the next entry in the segment.
137 // Return the current entry.
142 * Reads the next entry in the segment.
144 @SuppressWarnings("unchecked")
145 private void readNext() {
146 // Compute the index of the next entry in the segment.
147 final long index = getNextIndex();
150 // Read more bytes from the segment if necessary.
151 if (memory.remaining() < maxEntrySize) {
152 long position = channel.position() + memory.position();
153 channel.position(position);
155 channel.read(memory);
156 channel.position(position);
160 // Mark the buffer so it can be reset if necessary.
164 // Read the length of the entry.
165 final int length = memory.getInt();
167 // If the buffer length is zero then return.
168 if (length <= 0 || length > maxEntrySize) {
169 memory.reset().limit(memory.position());
174 // Read the checksum of the entry.
175 long checksum = memory.getInt() & 0xFFFFFFFFL;
177 // Compute the checksum for the entry bytes.
178 final Checksum crc32 = new CRC32();
179 crc32.update(memory.array(), memory.position(), length);
181 // If the stored checksum equals the computed checksum, return the entry.
182 if (checksum == crc32.getValue()) {
183 int limit = memory.limit();
184 memory.limit(memory.position() + length);
185 E entry = namespace.deserialize(memory);
187 nextEntry = new Indexed<>(index, entry, length);
189 memory.reset().limit(memory.position());
192 } catch (BufferUnderflowException e) {
193 memory.reset().limit(memory.position());
196 } catch (IOException e) {
197 throw new StorageException(e);
202 public void close() {
203 // Do nothing. The parent reader manages the channel.