Import atomix/{storage,utils}
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / FileChannelJournalSegmentReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.StorageException;
19 import io.atomix.storage.journal.index.JournalIndex;
20 import io.atomix.storage.journal.index.Position;
21 import io.atomix.utils.serializer.Namespace;
22
23 import java.io.IOException;
24 import java.nio.BufferUnderflowException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
27 import java.util.NoSuchElementException;
28 import java.util.zip.CRC32;
29 import java.util.zip.Checksum;
30
31 /**
32  * Log segment reader.
33  *
34  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
35  */
36 class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
37   private final FileChannel channel;
38   private final int maxEntrySize;
39   private final JournalIndex index;
40   private final Namespace namespace;
41   private final ByteBuffer memory;
42   private final long firstIndex;
43   private Indexed<E> currentEntry;
44   private Indexed<E> nextEntry;
45
46   FileChannelJournalSegmentReader(
47       FileChannel channel,
48       JournalSegment<E> segment,
49       int maxEntrySize,
50       JournalIndex index,
51       Namespace namespace) {
52     this.channel = channel;
53     this.maxEntrySize = maxEntrySize;
54     this.index = index;
55     this.namespace = namespace;
56     this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
57     this.firstIndex = segment.index();
58     reset();
59   }
60
61   @Override
62   public long getFirstIndex() {
63     return firstIndex;
64   }
65
66   @Override
67   public long getCurrentIndex() {
68     return currentEntry != null ? currentEntry.index() : 0;
69   }
70
71   @Override
72   public Indexed<E> getCurrentEntry() {
73     return currentEntry;
74   }
75
76   @Override
77   public long getNextIndex() {
78     return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
79   }
80
81   @Override
82   public void reset(long index) {
83     reset();
84     Position position = this.index.lookup(index - 1);
85     if (position != null) {
86       currentEntry = new Indexed<>(position.index() - 1, null, 0);
87       try {
88         channel.position(position.position());
89         memory.clear().flip();
90       } catch (IOException e) {
91         throw new StorageException(e);
92       }
93       readNext();
94     }
95     while (getNextIndex() < index && hasNext()) {
96       next();
97     }
98   }
99
100   @Override
101   public void reset() {
102     try {
103       channel.position(JournalSegmentDescriptor.BYTES);
104     } catch (IOException e) {
105       throw new StorageException(e);
106     }
107     memory.clear().limit(0);
108     currentEntry = null;
109     nextEntry = null;
110     readNext();
111   }
112
113   @Override
114   public boolean hasNext() {
115     // If the next entry is null, check whether a next entry exists.
116     if (nextEntry == null) {
117       readNext();
118     }
119     return nextEntry != null;
120   }
121
122   @Override
123   public Indexed<E> next() {
124     if (!hasNext()) {
125       throw new NoSuchElementException();
126     }
127
128     // Set the current entry to the next entry.
129     currentEntry = nextEntry;
130
131     // Reset the next entry to null.
132     nextEntry = null;
133
134     // Read the next entry in the segment.
135     readNext();
136
137     // Return the current entry.
138     return currentEntry;
139   }
140
141   /**
142    * Reads the next entry in the segment.
143    */
144   @SuppressWarnings("unchecked")
145   private void readNext() {
146     // Compute the index of the next entry in the segment.
147     final long index = getNextIndex();
148
149     try {
150       // Read more bytes from the segment if necessary.
151       if (memory.remaining() < maxEntrySize) {
152         long position = channel.position() + memory.position();
153         channel.position(position);
154         memory.clear();
155         channel.read(memory);
156         channel.position(position);
157         memory.flip();
158       }
159
160       // Mark the buffer so it can be reset if necessary.
161       memory.mark();
162
163       try {
164         // Read the length of the entry.
165         final int length = memory.getInt();
166
167         // If the buffer length is zero then return.
168         if (length <= 0 || length > maxEntrySize) {
169           memory.reset().limit(memory.position());
170           nextEntry = null;
171           return;
172         }
173
174         // Read the checksum of the entry.
175         long checksum = memory.getInt() & 0xFFFFFFFFL;
176
177         // Compute the checksum for the entry bytes.
178         final Checksum crc32 = new CRC32();
179         crc32.update(memory.array(), memory.position(), length);
180
181         // If the stored checksum equals the computed checksum, return the entry.
182         if (checksum == crc32.getValue()) {
183           int limit = memory.limit();
184           memory.limit(memory.position() + length);
185           E entry = namespace.deserialize(memory);
186           memory.limit(limit);
187           nextEntry = new Indexed<>(index, entry, length);
188         } else {
189           memory.reset().limit(memory.position());
190           nextEntry = null;
191         }
192       } catch (BufferUnderflowException e) {
193         memory.reset().limit(memory.position());
194         nextEntry = null;
195       }
196     } catch (IOException e) {
197       throw new StorageException(e);
198     }
199   }
200
201   @Override
202   public void close() {
203     // Do nothing. The parent reader manages the channel.
204   }
205 }