2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
21 import com.esotericsoftware.kryo.KryoException;
22 import com.google.common.annotations.VisibleForTesting;
23 import io.atomix.storage.journal.index.JournalIndex;
24 import java.io.IOException;
25 import java.nio.BufferOverflowException;
26 import java.nio.ByteBuffer;
27 import java.nio.MappedByteBuffer;
28 import java.nio.channels.FileChannel;
29 import java.nio.channels.SeekableByteChannel;
30 import java.util.zip.CRC32;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
38 * The format of an entry in the log is as follows:
40 * <li>64-bit index</li>
41 * <li>8-bit boolean indicating whether a term change is contained in the entry</li>
42 * <li>64-bit optional term</li>
43 * <li>32-bit signed entry length, including the entry type ID</li>
44 * <li>8-bit signed entry type ID</li>
45 * <li>n-bit entry bytes</li>
48 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
50 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
51 private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
52 private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
54 private final ByteBuffer memory;
55 private Indexed<E> lastEntry;
56 private long currentPosition;
58 DiskJournalSegmentWriter(
60 JournalSegment<E> segment,
63 JournalSerdes namespace) {
64 super(channel, segment, maxEntrySize, index, namespace);
65 memory = allocMemory(maxEntrySize);
69 DiskJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
71 memory = allocMemory(maxEntrySize);
72 lastEntry = previous.getLastEntry();
73 currentPosition = position;
76 private static ByteBuffer allocMemory(int maxEntrySize) {
77 final var buf = ByteBuffer.allocate((maxEntrySize + HEADER_BYTES) * 2);
83 MappedByteBuffer buffer() {
88 MappedJournalSegmentWriter<E> toMapped() {
89 return new MappedJournalSegmentWriter<>(this, (int) currentPosition);
93 DiskJournalSegmentWriter<E> toFileChannel() {
98 void reset(final long index) {
99 long nextIndex = firstIndex;
101 // Clear the buffer indexes.
102 currentPosition = JournalSegmentDescriptor.BYTES;
105 // Clear memory buffer and read fist chunk
106 channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
109 while (index == 0 || nextIndex <= index) {
110 final var entry = prepareNextEntry(channel, memory, maxEntrySize);
115 final var bytes = entry.bytes();
116 final var length = bytes.remaining();
118 lastEntry = new Indexed<>(nextIndex, namespace.<E>deserialize(bytes), length);
119 } catch (KryoException e) {
120 // No-op, position is only updated on success
121 LOG.debug("Failed to deserialize entry", e);
125 this.index.index(nextIndex, (int) currentPosition);
128 // Update the current position for indexing.
129 currentPosition = currentPosition + HEADER_BYTES + length;
130 memory.position(memory.position() + length);
132 } catch (IOException e) {
133 throw new StorageException(e);
138 static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory,
139 final int maxEntrySize) throws IOException {
140 int remaining = memory.remaining();
142 if (remaining < HEADER_BYTES) {
143 // We do not have the header available. Move the pointer and read.
144 channel.read(memory.compact());
145 remaining = memory.flip().remaining();
146 if (remaining < HEADER_BYTES) {
147 // could happen with mis-padded segment
157 length = memory.mark().getInt();
158 if (length < 1 || length > maxEntrySize) {
164 if (remaining >= Integer.BYTES + length) {
165 // Fast path: we have the entry properly positioned
169 // Not enough data for entry, to header start
172 // we have already compacted the buffer, there is just not enough data
176 // Try to read more data and check again
177 channel.read(memory.compact());
178 remaining = memory.flip().remaining();
182 // Read the checksum of the entry.
183 final int checksum = memory.getInt();
185 // Slice off the entry's bytes
186 final var entryBytes = memory.slice();
187 entryBytes.limit(length);
189 // Compute the checksum for the entry bytes.
190 final var crc32 = new CRC32();
191 crc32.update(entryBytes);
193 // If the stored checksum does not equal the computed checksum, do not proceed further
194 final var computed = (int) crc32.getValue();
195 if (checksum != computed) {
196 LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
201 return new SegmentEntry(checksum, entryBytes.rewind());
205 Indexed<E> getLastEntry() {
210 @SuppressWarnings("unchecked")
211 <T extends E> Indexed<T> append(T entry) {
212 // Store the entry index.
213 final long index = getNextIndex();
215 // Serialize the entry.
217 namespace.serialize(entry, memory.clear().position(HEADER_BYTES));
218 } catch (KryoException e) {
219 throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
223 final int length = memory.limit() - HEADER_BYTES;
225 // Ensure there's enough space left in the buffer to store the entry.
226 if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
227 throw new BufferOverflowException();
230 // If the entry length exceeds the maximum entry size then throw an exception.
231 if (length > maxEntrySize) {
232 throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
235 // Compute the checksum for the entry.
236 final CRC32 crc32 = new CRC32();
237 crc32.update(memory.array(), HEADER_BYTES, memory.limit() - HEADER_BYTES);
238 final long checksum = crc32.getValue();
240 // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
241 memory.putInt(0, length).putInt(Integer.BYTES, (int) checksum);
243 channel.write(memory, currentPosition);
244 } catch (IOException e) {
245 throw new StorageException(e);
248 // Update the last entry with the correct index/term/length.
249 Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
250 this.lastEntry = indexedEntry;
251 this.index.index(index, (int) currentPosition);
253 currentPosition = currentPosition + HEADER_BYTES + length;
254 return (Indexed<T>) indexedEntry;
258 void truncate(long index) {
259 // If the index is greater than or equal to the last index, skip the truncate.
260 if (index >= getLastIndex()) {
264 // Reset the last entry.
267 // Truncate the index.
268 this.index.truncate(index);
271 if (index < firstIndex) {
272 // Reset the writer to the first entry.
273 currentPosition = JournalSegmentDescriptor.BYTES;
275 // Reset the writer to the given index.
279 // Zero the entry header at current channel position.
280 channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), currentPosition);
281 } catch (IOException e) {
282 throw new StorageException(e);
289 if (channel.isOpen()) {
292 } catch (IOException e) {
293 throw new StorageException(e);