46d7a659568eb0e3b7703adc461cb1d1ed1c9d06
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / FileChannelJournalSegmentReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import io.atomix.storage.journal.index.Position;
20 import java.io.IOException;
21 import java.nio.BufferUnderflowException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.util.NoSuchElementException;
25 import java.util.zip.CRC32;
26 import java.util.zip.Checksum;
27
28 /**
29  * Log segment reader.
30  *
31  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
32  */
33 final class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
34   private final FileChannel channel;
35   private final int maxEntrySize;
36   private final JournalIndex index;
37   private final JournalSerdes namespace;
38   private final ByteBuffer memory;
39   private final long firstIndex;
40   private Indexed<E> currentEntry;
41   private Indexed<E> nextEntry;
42   private long currentPosition;
43
44   FileChannelJournalSegmentReader(
45       FileChannel channel,
46       JournalSegment<E> segment,
47       int maxEntrySize,
48       JournalIndex index,
49       JournalSerdes namespace) {
50     this.channel = channel;
51     this.maxEntrySize = maxEntrySize;
52     this.index = index;
53     this.namespace = namespace;
54     this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
55     this.firstIndex = segment.index();
56     reset();
57   }
58
59   @Override
60   public long getFirstIndex() {
61     return firstIndex;
62   }
63
64   @Override
65   public long getCurrentIndex() {
66     return currentEntry != null ? currentEntry.index() : 0;
67   }
68
69   @Override
70   public Indexed<E> getCurrentEntry() {
71     return currentEntry;
72   }
73
74   @Override
75   public long getNextIndex() {
76     return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
77   }
78
79   @Override
80   public void reset(long index) {
81     reset();
82     Position position = this.index.lookup(index - 1);
83     if (position != null) {
84       currentEntry = new Indexed<>(position.index() - 1, null, 0);
85       currentPosition = position.position();
86       memory.clear().flip();
87       readNext();
88     }
89     while (getNextIndex() < index && hasNext()) {
90       next();
91     }
92   }
93
94   @Override
95   public void reset() {
96     currentPosition = JournalSegmentDescriptor.BYTES;
97     memory.clear().limit(0);
98     currentEntry = null;
99     nextEntry = null;
100     readNext();
101   }
102
103   @Override
104   public boolean hasNext() {
105     // If the next entry is null, check whether a next entry exists.
106     if (nextEntry == null) {
107       readNext();
108     }
109     return nextEntry != null;
110   }
111
112   @Override
113   public Indexed<E> next() {
114     if (!hasNext()) {
115       throw new NoSuchElementException();
116     }
117
118     // Set the current entry to the next entry.
119     currentEntry = nextEntry;
120
121     // Reset the next entry to null.
122     nextEntry = null;
123
124     // Read the next entry in the segment.
125     readNext();
126
127     // Return the current entry.
128     return currentEntry;
129   }
130
131   /**
132    * Reads the next entry in the segment.
133    */
134   private void readNext() {
135     // Compute the index of the next entry in the segment.
136     final long index = getNextIndex();
137
138     try {
139       // Read more bytes from the segment if necessary.
140       if (memory.remaining() < maxEntrySize) {
141         long position = currentPosition + memory.position();
142         memory.clear();
143         channel.read(memory, position);
144         currentPosition = position;
145         memory.flip();
146       }
147
148       // Mark the buffer so it can be reset if necessary.
149       memory.mark();
150
151       try {
152         // Read the length of the entry.
153         final int length = memory.getInt();
154
155         // If the buffer length is zero then return.
156         if (length <= 0 || length > maxEntrySize) {
157           memory.reset().limit(memory.position());
158           nextEntry = null;
159           return;
160         }
161
162         // Read the checksum of the entry.
163         long checksum = memory.getInt() & 0xFFFFFFFFL;
164
165         // Compute the checksum for the entry bytes.
166         final Checksum crc32 = new CRC32();
167         crc32.update(memory.array(), memory.position(), length);
168
169         // If the stored checksum equals the computed checksum, return the entry.
170         if (checksum == crc32.getValue()) {
171           int limit = memory.limit();
172           memory.limit(memory.position() + length);
173           E entry = namespace.deserialize(memory);
174           memory.limit(limit);
175           nextEntry = new Indexed<>(index, entry, length);
176         } else {
177           memory.reset().limit(memory.position());
178           nextEntry = null;
179         }
180       } catch (BufferUnderflowException e) {
181         memory.reset().limit(memory.position());
182         nextEntry = null;
183       }
184     } catch (IOException e) {
185       throw new StorageException(e);
186     }
187   }
188
189   @Override
190   public void close() {
191     // Do nothing. The parent reader manages the channel.
192   }
193 }