Move entry tracking to SegmentedJournalReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskJournalSegmentWriter.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
20
21 import com.esotericsoftware.kryo.KryoException;
22 import com.google.common.annotations.VisibleForTesting;
23 import io.atomix.storage.journal.index.JournalIndex;
24 import java.io.IOException;
25 import java.nio.BufferOverflowException;
26 import java.nio.ByteBuffer;
27 import java.nio.MappedByteBuffer;
28 import java.nio.channels.FileChannel;
29 import java.nio.channels.SeekableByteChannel;
30 import java.util.zip.CRC32;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Segment writer.
37  * <p>
38  * The format of an entry in the log is as follows:
39  * <ul>
40  * <li>64-bit index</li>
41  * <li>8-bit boolean indicating whether a term change is contained in the entry</li>
42  * <li>64-bit optional term</li>
43  * <li>32-bit signed entry length, including the entry type ID</li>
44  * <li>8-bit signed entry type ID</li>
45  * <li>n-bit entry bytes</li>
46  * </ul>
47  *
48  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
49  */
50 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
51   private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
52   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
53
54   private final ByteBuffer memory;
55   private Indexed<E> lastEntry;
56   private long currentPosition;
57
58   DiskJournalSegmentWriter(
59       FileChannel channel,
60       JournalSegment<E> segment,
61       int maxEntrySize,
62       JournalIndex index,
63       JournalSerdes namespace) {
64     super(channel, segment, maxEntrySize, index, namespace);
65     memory = allocMemory(maxEntrySize);
66     reset(0);
67   }
68
69   DiskJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
70     super(previous);
71     memory = allocMemory(maxEntrySize);
72     lastEntry = previous.getLastEntry();
73     currentPosition = position;
74   }
75
76   private static ByteBuffer allocMemory(int maxEntrySize) {
77     final var buf = ByteBuffer.allocate((maxEntrySize + HEADER_BYTES) * 2);
78     buf.limit(0);
79     return buf;
80   }
81
82   @Override
83   MappedByteBuffer buffer() {
84     return null;
85   }
86
87   @Override
88   MappedJournalSegmentWriter<E> toMapped() {
89     return new MappedJournalSegmentWriter<>(this, (int) currentPosition);
90   }
91
92   @Override
93   DiskJournalSegmentWriter<E> toFileChannel() {
94     return this;
95   }
96
97   @Override
98   void reset(final long index) {
99       long nextIndex = firstIndex;
100
101       // Clear the buffer indexes.
102       currentPosition = JournalSegmentDescriptor.BYTES;
103
104       try {
105           // Clear memory buffer and read fist chunk
106           channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
107           memory.flip();
108
109           while (index == 0 || nextIndex <= index) {
110               final var entry = prepareNextEntry(channel, memory, maxEntrySize);
111               if (entry == null) {
112                   break;
113               }
114
115               final var bytes = entry.bytes();
116               final var length = bytes.remaining();
117               try {
118                   lastEntry = new Indexed<>(nextIndex, namespace.<E>deserialize(bytes), length);
119               } catch (KryoException e) {
120                   // No-op, position is only updated on success
121                   LOG.debug("Failed to deserialize entry", e);
122                   break;
123               }
124
125               this.index.index(nextIndex, (int) currentPosition);
126               nextIndex++;
127
128               // Update the current position for indexing.
129               currentPosition = currentPosition + HEADER_BYTES + length;
130               memory.position(memory.position() + length);
131           }
132       } catch (IOException e) {
133           throw new StorageException(e);
134       }
135   }
136
137   @VisibleForTesting
138   static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory,
139           final int maxEntrySize) throws IOException {
140       int remaining = memory.remaining();
141       boolean compacted;
142       if (remaining < HEADER_BYTES) {
143           // We do not have the header available. Move the pointer and read.
144           channel.read(memory.compact());
145           remaining = memory.flip().remaining();
146           if (remaining < HEADER_BYTES) {
147               // could happen with mis-padded segment
148               return null;
149           }
150           compacted = true;
151       } else {
152           compacted = false;
153       }
154
155       int length;
156       while (true) {
157           length = memory.mark().getInt();
158           if (length < 1 || length > maxEntrySize) {
159               // Invalid length,
160               memory.reset();
161               return null;
162           }
163
164           if (remaining >= Integer.BYTES + length) {
165               // Fast path: we have the entry properly positioned
166               break;
167           }
168
169           // Not enough data for entry, to header start
170           memory.reset();
171           if (compacted) {
172               // we have already compacted the buffer, there is just not enough data
173               return null;
174           }
175
176           // Try to read more data and check again
177           channel.read(memory.compact());
178           remaining = memory.flip().remaining();
179           compacted = true;
180       }
181
182       // Read the checksum of the entry.
183       final int checksum = memory.getInt();
184
185       // Slice off the entry's bytes
186       final var entryBytes = memory.slice();
187       entryBytes.limit(length);
188
189       // Compute the checksum for the entry bytes.
190       final var crc32 = new CRC32();
191       crc32.update(entryBytes);
192
193       // If the stored checksum does not equal the computed checksum, do not proceed further
194       final var computed = (int) crc32.getValue();
195       if (checksum != computed) {
196           LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
197           memory.reset();
198           return null;
199       }
200
201       return new SegmentEntry(checksum, entryBytes.rewind());
202   }
203
204   @Override
205   Indexed<E> getLastEntry() {
206     return lastEntry;
207   }
208
209   @Override
210   @SuppressWarnings("unchecked")
211   <T extends E> Indexed<T> append(T entry) {
212     // Store the entry index.
213     final long index = getNextIndex();
214
215     // Serialize the entry.
216     try {
217       namespace.serialize(entry, memory.clear().position(HEADER_BYTES));
218     } catch (KryoException e) {
219       throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
220     }
221     memory.flip();
222
223     final int length = memory.limit() - HEADER_BYTES;
224
225     // Ensure there's enough space left in the buffer to store the entry.
226     if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
227       throw new BufferOverflowException();
228     }
229
230     // If the entry length exceeds the maximum entry size then throw an exception.
231     if (length > maxEntrySize) {
232       throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
233     }
234
235     // Compute the checksum for the entry.
236     final CRC32 crc32 = new CRC32();
237     crc32.update(memory.array(), HEADER_BYTES, memory.limit() - HEADER_BYTES);
238     final long checksum = crc32.getValue();
239
240     // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
241     memory.putInt(0, length).putInt(Integer.BYTES, (int) checksum);
242     try {
243       channel.write(memory, currentPosition);
244     } catch (IOException e) {
245       throw new StorageException(e);
246     }
247
248     // Update the last entry with the correct index/term/length.
249     Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
250     this.lastEntry = indexedEntry;
251     this.index.index(index, (int) currentPosition);
252
253     currentPosition = currentPosition + HEADER_BYTES + length;
254     return (Indexed<T>) indexedEntry;
255   }
256
257   @Override
258   void truncate(long index) {
259     // If the index is greater than or equal to the last index, skip the truncate.
260     if (index >= getLastIndex()) {
261       return;
262     }
263
264     // Reset the last entry.
265     lastEntry = null;
266
267     // Truncate the index.
268     this.index.truncate(index);
269
270     try {
271       if (index < firstIndex) {
272         // Reset the writer to the first entry.
273         currentPosition = JournalSegmentDescriptor.BYTES;
274       } else {
275         // Reset the writer to the given index.
276         reset(index);
277       }
278
279       // Zero the entry header at current channel position.
280       channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), currentPosition);
281     } catch (IOException e) {
282       throw new StorageException(e);
283     }
284   }
285
286   @Override
287   void flush() {
288     try {
289       if (channel.isOpen()) {
290         channel.force(true);
291       }
292     } catch (IOException e) {
293       throw new StorageException(e);
294     }
295   }
296
297   @Override
298   void close() {
299     flush();
300   }
301 }