Move ENTRY_HEADER_BYTES
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / MappedJournalSegmentWriter.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
20
21 import com.esotericsoftware.kryo.KryoException;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import java.io.IOException;
24 import java.nio.BufferOverflowException;
25 import java.nio.BufferUnderflowException;
26 import java.nio.ByteBuffer;
27 import java.nio.MappedByteBuffer;
28 import java.nio.channels.FileChannel;
29 import java.util.zip.CRC32;
30 import org.eclipse.jdt.annotation.NonNull;
31
32 /**
33  * Segment writer.
34  * <p>
35  * The format of an entry in the log is as follows:
36  * <ul>
37  * <li>64-bit index</li>
38  * <li>8-bit boolean indicating whether a term change is contained in the entry</li>
39  * <li>64-bit optional term</li>
40  * <li>32-bit signed entry length, including the entry type ID</li>
41  * <li>8-bit signed entry type ID</li>
42  * <li>n-bit entry bytes</li>
43  * </ul>
44  *
45  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
46  */
47 final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
48   private final @NonNull MappedByteBuffer mappedBuffer;
49   private final ByteBuffer buffer;
50
51   private Indexed<E> lastEntry;
52
53   MappedJournalSegmentWriter(
54       FileChannel channel,
55       JournalSegment<E> segment,
56       int maxEntrySize,
57       JournalIndex index,
58       JournalSerdes namespace) {
59     super(channel, segment, maxEntrySize, index, namespace);
60     mappedBuffer = mapBuffer(channel, maxSegmentSize);
61     buffer = mappedBuffer.slice();
62     reset(0);
63   }
64
65   MappedJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
66     super(previous);
67     mappedBuffer = mapBuffer(channel, maxSegmentSize);
68     buffer = mappedBuffer.slice().position(position);
69     lastEntry = previous.getLastEntry();
70   }
71
72   private static @NonNull MappedByteBuffer mapBuffer(FileChannel channel, int maxSegmentSize) {
73     try {
74       return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize);
75     } catch (IOException e) {
76       throw new StorageException(e);
77     }
78   }
79
80   @Override
81   @NonNull MappedByteBuffer buffer() {
82     return mappedBuffer;
83   }
84
85   @Override
86   MappedJournalSegmentWriter<E> toMapped() {
87     return this;
88   }
89
90   @Override
91   DiskJournalSegmentWriter<E> toFileChannel() {
92     final int position = buffer.position();
93     close();
94     return new DiskJournalSegmentWriter<>(this, position);
95   }
96
97   @Override
98   void reset(long index) {
99     long nextIndex = firstIndex;
100
101     // Clear the buffer indexes.
102     buffer.position(JournalSegmentDescriptor.BYTES);
103
104     // Record the current buffer position.
105     int position = buffer.position();
106
107     // Read the entry length.
108     buffer.mark();
109
110     try {
111       int length = buffer.getInt();
112
113       // If the length is non-zero, read the entry.
114       while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
115
116         // Read the checksum of the entry.
117         final long checksum = buffer.getInt() & 0xFFFFFFFFL;
118
119         // Slice off the entry's bytes
120         final ByteBuffer entryBytes = buffer.slice();
121         entryBytes.limit(length);
122
123         // Compute the checksum for the entry bytes.
124         final CRC32 crc32 = new CRC32();
125         crc32.update(entryBytes);
126
127         // If the stored checksum does not equal the computed checksum, do not proceed further
128         if (checksum != crc32.getValue()) {
129           break;
130         }
131
132         entryBytes.rewind();
133         final E entry = namespace.deserialize(entryBytes);
134         lastEntry = new Indexed<>(nextIndex, entry, length);
135         this.index.index(nextIndex, position);
136         nextIndex++;
137
138         // Update the current position for indexing.
139         position = buffer.position() + length;
140         buffer.position(position);
141
142         length = buffer.mark().getInt();
143       }
144
145       // Reset the buffer to the previous mark.
146       buffer.reset();
147     } catch (BufferUnderflowException e) {
148       buffer.reset();
149     }
150   }
151
152   @Override
153   Indexed<E> getLastEntry() {
154     return lastEntry;
155   }
156
157   @Override
158   @SuppressWarnings("unchecked")
159   <T extends E> Indexed<T> append(T entry) {
160     // Store the entry index.
161     final long index = getNextIndex();
162
163     // Serialize the entry.
164     int position = buffer.position();
165     if (position + HEADER_BYTES > buffer.limit()) {
166       throw new BufferOverflowException();
167     }
168
169     buffer.position(position + HEADER_BYTES);
170
171     try {
172       namespace.serialize(entry, buffer);
173     } catch (KryoException e) {
174       throw new BufferOverflowException();
175     }
176
177     final int length = buffer.position() - (position + HEADER_BYTES);
178
179     // If the entry length exceeds the maximum entry size then throw an exception.
180     if (length > maxEntrySize) {
181       // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
182       buffer.position(position);
183       throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
184     }
185
186     // Compute the checksum for the entry.
187     final CRC32 crc32 = new CRC32();
188     buffer.position(position + HEADER_BYTES);
189     ByteBuffer slice = buffer.slice();
190     slice.limit(length);
191     crc32.update(slice);
192     final long checksum = crc32.getValue();
193
194     // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
195     buffer.position(position).putInt(length).putInt((int) checksum).position(position + HEADER_BYTES + length);
196
197     // Update the last entry with the correct index/term/length.
198     Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
199     this.lastEntry = indexedEntry;
200     this.index.index(index, position);
201     return (Indexed<T>) indexedEntry;
202   }
203
204   @Override
205   void truncate(long index) {
206     // If the index is greater than or equal to the last index, skip the truncate.
207     if (index >= getLastIndex()) {
208       return;
209     }
210
211     // Reset the last entry.
212     lastEntry = null;
213
214     // Truncate the index.
215     this.index.truncate(index);
216
217     if (index < firstIndex) {
218       // Reset the writer to the first entry.
219       buffer.position(JournalSegmentDescriptor.BYTES);
220     } else {
221       // Reset the writer to the given index.
222       reset(index);
223     }
224
225     // Zero the entry header at current buffer position.
226     int position = buffer.position();
227     // Note: we issue a single putLong() instead of two putInt()s.
228     buffer.putLong(0).position(position);
229   }
230
231   @Override
232   void flush() {
233     mappedBuffer.force();
234   }
235
236   @Override
237   void close() {
238     flush();
239     try {
240       BufferCleaner.freeBuffer(mappedBuffer);
241     } catch (IOException e) {
242       throw new StorageException(e);
243     }
244   }
245 }