Mark classes as final
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / FileChannelJournalSegmentReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import io.atomix.storage.journal.index.Position;
20 import io.atomix.utils.serializer.Namespace;
21
22 import java.io.IOException;
23 import java.nio.BufferUnderflowException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.FileChannel;
26 import java.util.NoSuchElementException;
27 import java.util.zip.CRC32;
28 import java.util.zip.Checksum;
29
30 /**
31  * Log segment reader.
32  *
33  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
34  */
35 final class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
36   private final FileChannel channel;
37   private final int maxEntrySize;
38   private final JournalIndex index;
39   private final Namespace namespace;
40   private final ByteBuffer memory;
41   private final long firstIndex;
42   private Indexed<E> currentEntry;
43   private Indexed<E> nextEntry;
44
45   FileChannelJournalSegmentReader(
46       FileChannel channel,
47       JournalSegment<E> segment,
48       int maxEntrySize,
49       JournalIndex index,
50       Namespace namespace) {
51     this.channel = channel;
52     this.maxEntrySize = maxEntrySize;
53     this.index = index;
54     this.namespace = namespace;
55     this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
56     this.firstIndex = segment.index();
57     reset();
58   }
59
60   @Override
61   public long getFirstIndex() {
62     return firstIndex;
63   }
64
65   @Override
66   public long getCurrentIndex() {
67     return currentEntry != null ? currentEntry.index() : 0;
68   }
69
70   @Override
71   public Indexed<E> getCurrentEntry() {
72     return currentEntry;
73   }
74
75   @Override
76   public long getNextIndex() {
77     return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
78   }
79
80   @Override
81   public void reset(long index) {
82     reset();
83     Position position = this.index.lookup(index - 1);
84     if (position != null) {
85       currentEntry = new Indexed<>(position.index() - 1, null, 0);
86       try {
87         channel.position(position.position());
88         memory.clear().flip();
89       } catch (IOException e) {
90         throw new StorageException(e);
91       }
92       readNext();
93     }
94     while (getNextIndex() < index && hasNext()) {
95       next();
96     }
97   }
98
99   @Override
100   public void reset() {
101     try {
102       channel.position(JournalSegmentDescriptor.BYTES);
103     } catch (IOException e) {
104       throw new StorageException(e);
105     }
106     memory.clear().limit(0);
107     currentEntry = null;
108     nextEntry = null;
109     readNext();
110   }
111
112   @Override
113   public boolean hasNext() {
114     // If the next entry is null, check whether a next entry exists.
115     if (nextEntry == null) {
116       readNext();
117     }
118     return nextEntry != null;
119   }
120
121   @Override
122   public Indexed<E> next() {
123     if (!hasNext()) {
124       throw new NoSuchElementException();
125     }
126
127     // Set the current entry to the next entry.
128     currentEntry = nextEntry;
129
130     // Reset the next entry to null.
131     nextEntry = null;
132
133     // Read the next entry in the segment.
134     readNext();
135
136     // Return the current entry.
137     return currentEntry;
138   }
139
140   /**
141    * Reads the next entry in the segment.
142    */
143   private void readNext() {
144     // Compute the index of the next entry in the segment.
145     final long index = getNextIndex();
146
147     try {
148       // Read more bytes from the segment if necessary.
149       if (memory.remaining() < maxEntrySize) {
150         long position = channel.position() + memory.position();
151         channel.position(position);
152         memory.clear();
153         channel.read(memory);
154         channel.position(position);
155         memory.flip();
156       }
157
158       // Mark the buffer so it can be reset if necessary.
159       memory.mark();
160
161       try {
162         // Read the length of the entry.
163         final int length = memory.getInt();
164
165         // If the buffer length is zero then return.
166         if (length <= 0 || length > maxEntrySize) {
167           memory.reset().limit(memory.position());
168           nextEntry = null;
169           return;
170         }
171
172         // Read the checksum of the entry.
173         long checksum = memory.getInt() & 0xFFFFFFFFL;
174
175         // Compute the checksum for the entry bytes.
176         final Checksum crc32 = new CRC32();
177         crc32.update(memory.array(), memory.position(), length);
178
179         // If the stored checksum equals the computed checksum, return the entry.
180         if (checksum == crc32.getValue()) {
181           int limit = memory.limit();
182           memory.limit(memory.position() + length);
183           E entry = namespace.deserialize(memory);
184           memory.limit(limit);
185           nextEntry = new Indexed<>(index, entry, length);
186         } else {
187           memory.reset().limit(memory.position());
188           nextEntry = null;
189         }
190       } catch (BufferUnderflowException e) {
191         memory.reset().limit(memory.position());
192         nextEntry = null;
193       }
194     } catch (IOException e) {
195       throw new StorageException(e);
196     }
197   }
198
199   @Override
200   public void close() {
201     // Do nothing. The parent reader manages the channel.
202   }
203 }