Clean up initial FileChannel writer reset's read
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / FileChannelJournalSegmentWriter.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import com.esotericsoftware.kryo.KryoException;
19 import io.atomix.storage.journal.index.JournalIndex;
20
21 import java.io.IOException;
22 import java.nio.BufferOverflowException;
23 import java.nio.BufferUnderflowException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.FileChannel;
26 import java.util.zip.CRC32;
27 import java.util.zip.Checksum;
28
29 /**
30  * Segment writer.
31  * <p>
32  * The format of an entry in the log is as follows:
33  * <ul>
34  * <li>64-bit index</li>
35  * <li>8-bit boolean indicating whether a term change is contained in the entry</li>
36  * <li>64-bit optional term</li>
37  * <li>32-bit signed entry length, including the entry type ID</li>
38  * <li>8-bit signed entry type ID</li>
39  * <li>n-bit entry bytes</li>
40  * </ul>
41  *
42  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
43  */
44 class FileChannelJournalSegmentWriter<E> implements JournalWriter<E> {
45   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]);
46
47   private final FileChannel channel;
48   private final JournalSegment<E> segment;
49   private final int maxEntrySize;
50   private final JournalIndex index;
51   private final JournalSerdes namespace;
52   private final ByteBuffer memory;
53   private final long firstIndex;
54   private Indexed<E> lastEntry;
55
56   FileChannelJournalSegmentWriter(
57       FileChannel channel,
58       JournalSegment<E> segment,
59       int maxEntrySize,
60       JournalIndex index,
61       JournalSerdes namespace) {
62     this.channel = channel;
63     this.segment = segment;
64     this.maxEntrySize = maxEntrySize;
65     this.index = index;
66     this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
67     memory.limit(0);
68     this.namespace = namespace;
69     this.firstIndex = segment.index();
70     reset(0);
71   }
72
73   @Override
74   public void reset(long index) {
75     long nextIndex = firstIndex;
76
77     // Clear the buffer indexes.
78     try {
79       channel.position(JournalSegmentDescriptor.BYTES);
80
81       // Record the current buffer position.
82       long position = channel.position();
83
84       // Clear memory buffer and read fist chunk
85       memory.clear();
86       channel.read(memory, position);
87       memory.flip();
88
89       // Read the entry length.
90       memory.mark();
91       int length = memory.getInt();
92
93       // If the length is non-zero, read the entry.
94       while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
95
96         // Read the checksum of the entry.
97         final long checksum = memory.getInt() & 0xFFFFFFFFL;
98
99         // Compute the checksum for the entry bytes.
100         final Checksum crc32 = new CRC32();
101         crc32.update(memory.array(), memory.position(), length);
102
103         // If the stored checksum equals the computed checksum, return the entry.
104         if (checksum == crc32.getValue()) {
105           int limit = memory.limit();
106           memory.limit(memory.position() + length);
107           final E entry = namespace.deserialize(memory);
108           memory.limit(limit);
109           lastEntry = new Indexed<>(nextIndex, entry, length);
110           this.index.index(nextIndex, (int) position);
111           nextIndex++;
112         } else {
113           break;
114         }
115
116         // Update the current position for indexing.
117         position = channel.position() + memory.position();
118
119         // Read more bytes from the segment if necessary.
120         if (memory.remaining() < maxEntrySize) {
121           channel.position(position);
122           memory.clear();
123           channel.read(memory);
124           channel.position(position);
125           memory.flip();
126         }
127
128         memory.mark();
129         length = memory.getInt();
130       }
131
132       // Reset the buffer to the previous mark.
133       channel.position(channel.position() + memory.reset().position());
134     } catch (BufferUnderflowException e) {
135       try {
136         channel.position(channel.position() + memory.reset().position());
137       } catch (IOException e2) {
138         throw new StorageException(e2);
139       }
140     } catch (IOException e) {
141       throw new StorageException(e);
142     }
143   }
144
145   @Override
146   public long getLastIndex() {
147     return lastEntry != null ? lastEntry.index() : segment.index() - 1;
148   }
149
150   @Override
151   public Indexed<E> getLastEntry() {
152     return lastEntry;
153   }
154
155   @Override
156   public long getNextIndex() {
157     if (lastEntry != null) {
158       return lastEntry.index() + 1;
159     } else {
160       return firstIndex;
161     }
162   }
163
164   @Override
165   public void append(Indexed<E> entry) {
166     final long nextIndex = getNextIndex();
167
168     // If the entry's index is greater than the next index in the segment, skip some entries.
169     if (entry.index() > nextIndex) {
170       throw new IndexOutOfBoundsException("Entry index is not sequential");
171     }
172
173     // If the entry's index is less than the next index, truncate the segment.
174     if (entry.index() < nextIndex) {
175       truncate(entry.index() - 1);
176     }
177     append(entry.entry());
178   }
179
180   @Override
181   @SuppressWarnings("unchecked")
182   public <T extends E> Indexed<T> append(T entry) {
183     // Store the entry index.
184     final long index = getNextIndex();
185
186     try {
187       // Serialize the entry.
188       memory.clear();
189       memory.position(Integer.BYTES + Integer.BYTES);
190       try {
191         namespace.serialize(entry, memory);
192       } catch (KryoException e) {
193         throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
194       }
195       memory.flip();
196
197       final int length = memory.limit() - (Integer.BYTES + Integer.BYTES);
198
199       // Ensure there's enough space left in the buffer to store the entry.
200       long position = channel.position();
201       if (segment.descriptor().maxSegmentSize() - position < length + Integer.BYTES + Integer.BYTES) {
202         throw new BufferOverflowException();
203       }
204
205       // If the entry length exceeds the maximum entry size then throw an exception.
206       if (length > maxEntrySize) {
207         throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
208       }
209
210       // Compute the checksum for the entry.
211       final Checksum crc32 = new CRC32();
212       crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES));
213       final long checksum = crc32.getValue();
214
215       // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
216       memory.putInt(0, length);
217       memory.putInt(Integer.BYTES, (int) checksum);
218       channel.write(memory);
219
220       // Update the last entry with the correct index/term/length.
221       Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
222       this.lastEntry = indexedEntry;
223       this.index.index(index, (int) position);
224       return (Indexed<T>) indexedEntry;
225     } catch (IOException e) {
226       throw new StorageException(e);
227     }
228   }
229
230   @Override
231   public void commit(long index) {
232
233   }
234
235   @Override
236   public void truncate(long index) {
237     // If the index is greater than or equal to the last index, skip the truncate.
238     if (index >= getLastIndex()) {
239       return;
240     }
241
242     // Reset the last entry.
243     lastEntry = null;
244
245     // Truncate the index.
246     this.index.truncate(index);
247
248     try {
249       if (index < segment.index()) {
250         // Reset the writer to the first entry.
251         channel.position(JournalSegmentDescriptor.BYTES);
252       } else {
253         // Reset the writer to the given index.
254         reset(index);
255       }
256
257       // Zero the entry header at current channel position.
258       channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), channel.position());
259     } catch (IOException e) {
260       throw new StorageException(e);
261     }
262   }
263
264   @Override
265   public void flush() {
266     try {
267       if (channel.isOpen()) {
268         channel.force(true);
269       }
270     } catch (IOException e) {
271       throw new StorageException(e);
272     }
273   }
274
275   @Override
276   public void close() {
277     flush();
278   }
279 }