Do not leak Kryo from atomix.storage
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / MappedJournalSegmentReader.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import io.atomix.storage.journal.index.Position;
20 import java.nio.BufferUnderflowException;
21 import java.nio.ByteBuffer;
22 import java.util.NoSuchElementException;
23 import java.util.zip.CRC32;
24
25 /**
26  * Log segment reader.
27  *
28  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
29  */
30 final class MappedJournalSegmentReader<E> implements JournalReader<E> {
31   private final ByteBuffer buffer;
32   private final int maxEntrySize;
33   private final JournalIndex index;
34   private final JournalSerdes namespace;
35   private final long firstIndex;
36   private Indexed<E> currentEntry;
37   private Indexed<E> nextEntry;
38
39   MappedJournalSegmentReader(
40       ByteBuffer buffer,
41       JournalSegment<E> segment,
42       int maxEntrySize,
43       JournalIndex index,
44       JournalSerdes namespace) {
45     this.buffer = buffer.slice();
46     this.maxEntrySize = maxEntrySize;
47     this.index = index;
48     this.namespace = namespace;
49     this.firstIndex = segment.index();
50     reset();
51   }
52
53   @Override
54   public long getFirstIndex() {
55     return firstIndex;
56   }
57
58   @Override
59   public long getCurrentIndex() {
60     return currentEntry != null ? currentEntry.index() : 0;
61   }
62
63   @Override
64   public Indexed<E> getCurrentEntry() {
65     return currentEntry;
66   }
67
68   @Override
69   public long getNextIndex() {
70     return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
71   }
72
73   @Override
74   public void reset(long index) {
75     reset();
76     Position position = this.index.lookup(index - 1);
77     if (position != null) {
78       currentEntry = new Indexed<>(position.index() - 1, null, 0);
79       buffer.position(position.position());
80       readNext();
81     }
82     while (getNextIndex() < index && hasNext()) {
83       next();
84     }
85   }
86
87   @Override
88   public void reset() {
89     buffer.position(JournalSegmentDescriptor.BYTES);
90     currentEntry = null;
91     nextEntry = null;
92     readNext();
93   }
94
95   @Override
96   public boolean hasNext() {
97     // If the next entry is null, check whether a next entry exists.
98     if (nextEntry == null) {
99       readNext();
100     }
101     return nextEntry != null;
102   }
103
104   @Override
105   public Indexed<E> next() {
106     if (!hasNext()) {
107       throw new NoSuchElementException();
108     }
109
110     // Set the current entry to the next entry.
111     currentEntry = nextEntry;
112
113     // Reset the next entry to null.
114     nextEntry = null;
115
116     // Read the next entry in the segment.
117     readNext();
118
119     // Return the current entry.
120     return currentEntry;
121   }
122
123   /**
124    * Reads the next entry in the segment.
125    */
126   private void readNext() {
127     // Compute the index of the next entry in the segment.
128     final long index = getNextIndex();
129
130     // Mark the buffer so it can be reset if necessary.
131     buffer.mark();
132
133     try {
134       // Read the length of the entry.
135       final int length = buffer.getInt();
136
137       // If the buffer length is zero then return.
138       if (length <= 0 || length > maxEntrySize) {
139         buffer.reset();
140         nextEntry = null;
141         return;
142       }
143
144       // Read the checksum of the entry.
145       long checksum = buffer.getInt() & 0xFFFFFFFFL;
146
147       // Compute the checksum for the entry bytes.
148       final CRC32 crc32 = new CRC32();
149       ByteBuffer slice = buffer.slice();
150       slice.limit(length);
151       crc32.update(slice);
152
153       // If the stored checksum equals the computed checksum, return the entry.
154       if (checksum == crc32.getValue()) {
155         slice.rewind();
156         E entry = namespace.deserialize(slice);
157         nextEntry = new Indexed<>(index, entry, length);
158         buffer.position(buffer.position() + length);
159       } else {
160         buffer.reset();
161         nextEntry = null;
162       }
163     } catch (BufferUnderflowException e) {
164       buffer.reset();
165       nextEntry = null;
166     }
167   }
168
169   @Override
170   public void close() {
171     // Do nothing. The writer is responsible for cleaning the mapped buffer.
172   }
173 }