696ab45cdeb06cb7e895941cc8bbb94b61d902c2
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / FileChannelJournalSegmentReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import io.atomix.storage.journal.index.Position;
20 import java.io.IOException;
21 import java.nio.BufferUnderflowException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.util.NoSuchElementException;
25 import java.util.zip.CRC32;
26 import java.util.zip.Checksum;
27
28 /**
29  * Log segment reader.
30  *
31  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
32  */
33 final class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
34   private final FileChannel channel;
35   private final int maxEntrySize;
36   private final JournalIndex index;
37   private final JournalSerdes namespace;
38   private final ByteBuffer memory;
39   private final long firstIndex;
40   private Indexed<E> currentEntry;
41   private Indexed<E> nextEntry;
42
43   FileChannelJournalSegmentReader(
44       FileChannel channel,
45       JournalSegment<E> segment,
46       int maxEntrySize,
47       JournalIndex index,
48       JournalSerdes namespace) {
49     this.channel = channel;
50     this.maxEntrySize = maxEntrySize;
51     this.index = index;
52     this.namespace = namespace;
53     this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
54     this.firstIndex = segment.index();
55     reset();
56   }
57
58   @Override
59   public long getFirstIndex() {
60     return firstIndex;
61   }
62
63   @Override
64   public long getCurrentIndex() {
65     return currentEntry != null ? currentEntry.index() : 0;
66   }
67
68   @Override
69   public Indexed<E> getCurrentEntry() {
70     return currentEntry;
71   }
72
73   @Override
74   public long getNextIndex() {
75     return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
76   }
77
78   @Override
79   public void reset(long index) {
80     reset();
81     Position position = this.index.lookup(index - 1);
82     if (position != null) {
83       currentEntry = new Indexed<>(position.index() - 1, null, 0);
84       try {
85         channel.position(position.position());
86         memory.clear().flip();
87       } catch (IOException e) {
88         throw new StorageException(e);
89       }
90       readNext();
91     }
92     while (getNextIndex() < index && hasNext()) {
93       next();
94     }
95   }
96
97   @Override
98   public void reset() {
99     try {
100       channel.position(JournalSegmentDescriptor.BYTES);
101     } catch (IOException e) {
102       throw new StorageException(e);
103     }
104     memory.clear().limit(0);
105     currentEntry = null;
106     nextEntry = null;
107     readNext();
108   }
109
110   @Override
111   public boolean hasNext() {
112     // If the next entry is null, check whether a next entry exists.
113     if (nextEntry == null) {
114       readNext();
115     }
116     return nextEntry != null;
117   }
118
119   @Override
120   public Indexed<E> next() {
121     if (!hasNext()) {
122       throw new NoSuchElementException();
123     }
124
125     // Set the current entry to the next entry.
126     currentEntry = nextEntry;
127
128     // Reset the next entry to null.
129     nextEntry = null;
130
131     // Read the next entry in the segment.
132     readNext();
133
134     // Return the current entry.
135     return currentEntry;
136   }
137
138   /**
139    * Reads the next entry in the segment.
140    */
141   private void readNext() {
142     // Compute the index of the next entry in the segment.
143     final long index = getNextIndex();
144
145     try {
146       // Read more bytes from the segment if necessary.
147       if (memory.remaining() < maxEntrySize) {
148         long position = channel.position() + memory.position();
149         channel.position(position);
150         memory.clear();
151         channel.read(memory);
152         channel.position(position);
153         memory.flip();
154       }
155
156       // Mark the buffer so it can be reset if necessary.
157       memory.mark();
158
159       try {
160         // Read the length of the entry.
161         final int length = memory.getInt();
162
163         // If the buffer length is zero then return.
164         if (length <= 0 || length > maxEntrySize) {
165           memory.reset().limit(memory.position());
166           nextEntry = null;
167           return;
168         }
169
170         // Read the checksum of the entry.
171         long checksum = memory.getInt() & 0xFFFFFFFFL;
172
173         // Compute the checksum for the entry bytes.
174         final Checksum crc32 = new CRC32();
175         crc32.update(memory.array(), memory.position(), length);
176
177         // If the stored checksum equals the computed checksum, return the entry.
178         if (checksum == crc32.getValue()) {
179           int limit = memory.limit();
180           memory.limit(memory.position() + length);
181           E entry = namespace.deserialize(memory);
182           memory.limit(limit);
183           nextEntry = new Indexed<>(index, entry, length);
184         } else {
185           memory.reset().limit(memory.position());
186           nextEntry = null;
187         }
188       } catch (BufferUnderflowException e) {
189         memory.reset().limit(memory.position());
190         nextEntry = null;
191       }
192     } catch (IOException e) {
193       throw new StorageException(e);
194     }
195   }
196
197   @Override
198   public void close() {
199     // Do nothing. The parent reader manages the channel.
200   }
201 }