2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
21 import com.esotericsoftware.kryo.KryoException;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import java.nio.ByteBuffer;
24 import java.nio.MappedByteBuffer;
25 import java.nio.channels.FileChannel;
26 import java.util.zip.CRC32;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
33 private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
35 final @NonNull FileChannel channel;
36 final @NonNull JournalSegment<E> segment;
37 private final @NonNull JournalIndex index;
38 final @NonNull JournalSerdes namespace;
39 final int maxSegmentSize;
40 final int maxEntrySize;
42 private Indexed<E> lastEntry;
43 private int currentPosition;
45 JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
46 final JournalIndex index, final JournalSerdes namespace) {
47 this.channel = requireNonNull(channel);
48 this.segment = requireNonNull(segment);
49 this.index = requireNonNull(index);
50 this.namespace = requireNonNull(namespace);
51 maxSegmentSize = segment.descriptor().maxSegmentSize();
52 this.maxEntrySize = maxEntrySize;
55 JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
56 channel = previous.channel;
57 segment = previous.segment;
58 index = previous.index;
59 namespace = previous.namespace;
60 maxSegmentSize = previous.maxSegmentSize;
61 maxEntrySize = previous.maxEntrySize;
62 lastEntry = previous.lastEntry;
63 currentPosition = previous.currentPosition;
67 * Returns the last written index.
69 * @return The last written index.
71 final long getLastIndex() {
72 return lastEntry != null ? lastEntry.index() : segment.firstIndex() - 1;
76 * Returns the last entry written.
78 * @return The last entry written.
80 final Indexed<E> getLastEntry() {
85 * Returns the next index to be written.
87 * @return The next index to be written.
89 final long getNextIndex() {
90 return lastEntry != null ? lastEntry.index() + 1 : segment.firstIndex();
94 * Tries to append an entry to the journal.
96 * @param entry The entry to append.
97 * @return The appended indexed entry, or {@code null} if there is not enough space available
99 final <T extends E> @Nullable Indexed<T> append(final T entry) {
100 // Store the entry index.
101 final long index = getNextIndex();
102 final int position = currentPosition;
104 // Serialize the entry.
105 final int bodyPosition = position + HEADER_BYTES;
106 final int avail = maxSegmentSize - bodyPosition;
108 LOG.trace("Not enough space for {} at {}", index, position);
112 final var writeLimit = Math.min(avail, maxEntrySize);
113 final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
115 namespace.serialize(entry, diskEntry);
116 } catch (KryoException e) {
117 if (writeLimit != maxEntrySize) {
118 // We have not provided enough capacity, signal to roll to next segment
119 LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
123 // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
124 throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
127 final int length = diskEntry.position() - HEADER_BYTES;
129 // Compute the checksum for the entry.
130 final var crc32 = new CRC32();
131 crc32.update(diskEntry.flip().position(HEADER_BYTES));
133 // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
134 diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
135 commitWrite(position, diskEntry.rewind());
137 // Update the last entry with the correct index/term/length.
138 final var indexedEntry = new Indexed<E>(index, entry, length);
139 lastEntry = indexedEntry;
140 this.index.index(index, position);
142 currentPosition = bodyPosition + length;
144 @SuppressWarnings("unchecked")
145 final var ugly = (Indexed<T>) indexedEntry;
149 abstract ByteBuffer startWrite(int position, int size);
151 abstract void commitWrite(int position, ByteBuffer entry);
154 * Resets the head of the segment to the given index.
156 * @param index the index to which to reset the head of the segment
158 final void reset(final long index) {
159 // acquire ownership of cache and make sure reader does not see anything we've done once we're done
160 final var reader = reader();
161 reader.invalidateCache();
163 resetWithBuffer(reader, index);
165 // Make sure reader does not see anything we've done
166 reader.invalidateCache();
170 abstract JournalSegmentReader<E> reader();
172 private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
173 long nextIndex = segment.firstIndex();
175 // Clear the buffer indexes and acquire ownership of the buffer
176 currentPosition = JournalSegmentDescriptor.BYTES;
177 reader.setPosition(JournalSegmentDescriptor.BYTES);
179 while (index == 0 || nextIndex <= index) {
180 final var entry = reader.readEntry(nextIndex);
186 this.index.index(nextIndex, currentPosition);
189 // Update the current position for indexing.
190 currentPosition = currentPosition + HEADER_BYTES + entry.size();
195 * Truncates the log to the given index.
197 * @param index The index to which to truncate the log.
199 final void truncate(final long index) {
200 // If the index is greater than or equal to the last index, skip the truncate.
201 if (index >= getLastIndex()) {
205 // Reset the last entry.
208 // Truncate the index.
209 this.index.truncate(index);
211 if (index < segment.firstIndex()) {
212 // Reset the writer to the first entry.
213 currentPosition = JournalSegmentDescriptor.BYTES;
215 // Reset the writer to the given index.
219 // Zero the entry header at current channel position.
220 writeEmptyHeader(currentPosition);
224 * Write {@link SegmentEntry#HEADER_BYTES} worth of zeroes at specified position.
226 * @param position position to write to
228 abstract void writeEmptyHeader(int position);
231 * Flushes written entries to disk.
233 abstract void flush();
236 * Closes this writer.
238 abstract void close();
241 * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
244 * @return the mapped buffer underlying the segment writer, or {@code null}.
246 abstract @Nullable MappedByteBuffer buffer();
248 abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
250 abstract @NonNull DiskJournalSegmentWriter<E> toFileChannel();