Reduce position changes during read
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / FileChannelJournalSegmentWriter.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import com.esotericsoftware.kryo.KryoException;
19 import io.atomix.storage.journal.index.JournalIndex;
20
21 import java.io.IOException;
22 import java.nio.BufferOverflowException;
23 import java.nio.BufferUnderflowException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.FileChannel;
26 import java.util.zip.CRC32;
27 import java.util.zip.Checksum;
28
29 /**
30  * Segment writer.
31  * <p>
32  * The format of an entry in the log is as follows:
33  * <ul>
34  * <li>64-bit index</li>
35  * <li>8-bit boolean indicating whether a term change is contained in the entry</li>
36  * <li>64-bit optional term</li>
37  * <li>32-bit signed entry length, including the entry type ID</li>
38  * <li>8-bit signed entry type ID</li>
39  * <li>n-bit entry bytes</li>
40  * </ul>
41  *
42  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
43  */
44 class FileChannelJournalSegmentWriter<E> implements JournalWriter<E> {
45   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]);
46
47   private final FileChannel channel;
48   private final JournalSegment<E> segment;
49   private final int maxEntrySize;
50   private final JournalIndex index;
51   private final JournalSerdes namespace;
52   private final ByteBuffer memory;
53   private final long firstIndex;
54   private Indexed<E> lastEntry;
55
56   FileChannelJournalSegmentWriter(
57       FileChannel channel,
58       JournalSegment<E> segment,
59       int maxEntrySize,
60       JournalIndex index,
61       JournalSerdes namespace) {
62     this.channel = channel;
63     this.segment = segment;
64     this.maxEntrySize = maxEntrySize;
65     this.index = index;
66     this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
67     memory.limit(0);
68     this.namespace = namespace;
69     this.firstIndex = segment.index();
70     reset(0);
71   }
72
73   @Override
74   public void reset(long index) {
75     long nextIndex = firstIndex;
76
77     // Clear the buffer indexes.
78     try {
79       channel.position(JournalSegmentDescriptor.BYTES);
80
81       // Record the current buffer position.
82       long position = channel.position();
83
84       // Clear memory buffer and read fist chunk
85       memory.clear();
86       channel.read(memory, position);
87       memory.flip();
88
89       // Read the entry length.
90       memory.mark();
91       int length = memory.getInt();
92
93       // If the length is non-zero, read the entry.
94       while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
95
96         // Read the checksum of the entry.
97         final long checksum = memory.getInt() & 0xFFFFFFFFL;
98
99         // Compute the checksum for the entry bytes.
100         final Checksum crc32 = new CRC32();
101         crc32.update(memory.array(), memory.position(), length);
102
103         // If the stored checksum equals the computed checksum, return the entry.
104         if (checksum == crc32.getValue()) {
105           int limit = memory.limit();
106           memory.limit(memory.position() + length);
107           final E entry = namespace.deserialize(memory);
108           memory.limit(limit);
109           lastEntry = new Indexed<>(nextIndex, entry, length);
110           this.index.index(nextIndex, (int) position);
111           nextIndex++;
112         } else {
113           break;
114         }
115
116         // Update the current position for indexing.
117         position = channel.position() + memory.position();
118
119         // Read more bytes from the segment if necessary.
120         if (memory.remaining() < maxEntrySize) {
121           memory.clear();
122           channel.position(position);
123           channel.read(memory, position);
124           memory.flip();
125         }
126
127         memory.mark();
128         length = memory.getInt();
129       }
130
131       // Reset the buffer to the previous mark.
132       channel.position(channel.position() + memory.reset().position());
133     } catch (BufferUnderflowException e) {
134       try {
135         channel.position(channel.position() + memory.reset().position());
136       } catch (IOException e2) {
137         throw new StorageException(e2);
138       }
139     } catch (IOException e) {
140       throw new StorageException(e);
141     }
142   }
143
144   @Override
145   public long getLastIndex() {
146     return lastEntry != null ? lastEntry.index() : segment.index() - 1;
147   }
148
149   @Override
150   public Indexed<E> getLastEntry() {
151     return lastEntry;
152   }
153
154   @Override
155   public long getNextIndex() {
156     if (lastEntry != null) {
157       return lastEntry.index() + 1;
158     } else {
159       return firstIndex;
160     }
161   }
162
163   @Override
164   public void append(Indexed<E> entry) {
165     final long nextIndex = getNextIndex();
166
167     // If the entry's index is greater than the next index in the segment, skip some entries.
168     if (entry.index() > nextIndex) {
169       throw new IndexOutOfBoundsException("Entry index is not sequential");
170     }
171
172     // If the entry's index is less than the next index, truncate the segment.
173     if (entry.index() < nextIndex) {
174       truncate(entry.index() - 1);
175     }
176     append(entry.entry());
177   }
178
179   @Override
180   @SuppressWarnings("unchecked")
181   public <T extends E> Indexed<T> append(T entry) {
182     // Store the entry index.
183     final long index = getNextIndex();
184
185     try {
186       // Serialize the entry.
187       memory.clear();
188       memory.position(Integer.BYTES + Integer.BYTES);
189       try {
190         namespace.serialize(entry, memory);
191       } catch (KryoException e) {
192         throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
193       }
194       memory.flip();
195
196       final int length = memory.limit() - (Integer.BYTES + Integer.BYTES);
197
198       // Ensure there's enough space left in the buffer to store the entry.
199       long position = channel.position();
200       if (segment.descriptor().maxSegmentSize() - position < length + Integer.BYTES + Integer.BYTES) {
201         throw new BufferOverflowException();
202       }
203
204       // If the entry length exceeds the maximum entry size then throw an exception.
205       if (length > maxEntrySize) {
206         throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
207       }
208
209       // Compute the checksum for the entry.
210       final Checksum crc32 = new CRC32();
211       crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES));
212       final long checksum = crc32.getValue();
213
214       // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
215       memory.putInt(0, length);
216       memory.putInt(Integer.BYTES, (int) checksum);
217       channel.write(memory);
218
219       // Update the last entry with the correct index/term/length.
220       Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
221       this.lastEntry = indexedEntry;
222       this.index.index(index, (int) position);
223       return (Indexed<T>) indexedEntry;
224     } catch (IOException e) {
225       throw new StorageException(e);
226     }
227   }
228
229   @Override
230   public void commit(long index) {
231
232   }
233
234   @Override
235   public void truncate(long index) {
236     // If the index is greater than or equal to the last index, skip the truncate.
237     if (index >= getLastIndex()) {
238       return;
239     }
240
241     // Reset the last entry.
242     lastEntry = null;
243
244     // Truncate the index.
245     this.index.truncate(index);
246
247     try {
248       if (index < segment.index()) {
249         // Reset the writer to the first entry.
250         channel.position(JournalSegmentDescriptor.BYTES);
251       } else {
252         // Reset the writer to the given index.
253         reset(index);
254       }
255
256       // Zero the entry header at current channel position.
257       channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), channel.position());
258     } catch (IOException e) {
259       throw new StorageException(e);
260     }
261   }
262
263   @Override
264   public void flush() {
265     try {
266       if (channel.isOpen()) {
267         channel.force(true);
268       }
269     } catch (IOException e) {
270       throw new StorageException(e);
271     }
272   }
273
274   @Override
275   public void close() {
276     flush();
277   }
278 }