Improve segmented journal actor metrics
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
20
21 import com.esotericsoftware.kryo.KryoException;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import java.nio.ByteBuffer;
24 import java.nio.MappedByteBuffer;
25 import java.nio.channels.FileChannel;
26 import java.util.zip.CRC32;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
33     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
34
35     final @NonNull FileChannel channel;
36     final @NonNull JournalSegment<E> segment;
37     private final @NonNull JournalIndex index;
38     final @NonNull JournalSerdes namespace;
39     final int maxSegmentSize;
40     final int maxEntrySize;
41
42     private Indexed<E> lastEntry;
43     private int currentPosition;
44
45     JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
46             final JournalIndex index, final JournalSerdes namespace) {
47         this.channel = requireNonNull(channel);
48         this.segment = requireNonNull(segment);
49         this.index = requireNonNull(index);
50         this.namespace = requireNonNull(namespace);
51         maxSegmentSize = segment.descriptor().maxSegmentSize();
52         this.maxEntrySize = maxEntrySize;
53     }
54
55     JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
56         channel = previous.channel;
57         segment = previous.segment;
58         index = previous.index;
59         namespace = previous.namespace;
60         maxSegmentSize = previous.maxSegmentSize;
61         maxEntrySize = previous.maxEntrySize;
62         lastEntry = previous.lastEntry;
63         currentPosition = previous.currentPosition;
64     }
65
66     /**
67      * Returns the last written index.
68      *
69      * @return The last written index.
70      */
71     final long getLastIndex() {
72         return lastEntry != null ? lastEntry.index() : segment.firstIndex() - 1;
73     }
74
75     /**
76      * Returns the last entry written.
77      *
78      * @return The last entry written.
79      */
80     final Indexed<E> getLastEntry() {
81         return lastEntry;
82     }
83
84     /**
85      * Returns the next index to be written.
86      *
87      * @return The next index to be written.
88      */
89     final long getNextIndex() {
90         return lastEntry != null ? lastEntry.index() + 1 : segment.firstIndex();
91     }
92
93     /**
94      * Tries to append an entry to the journal.
95      *
96      * @param entry The entry to append.
97      * @return The appended indexed entry, or {@code null} if there is not enough space available
98      */
99     final <T extends E> @Nullable Indexed<T> append(final T entry) {
100         // Store the entry index.
101         final long index = getNextIndex();
102         final int position = currentPosition;
103
104         // Serialize the entry.
105         final int bodyPosition = position + HEADER_BYTES;
106         final int avail = maxSegmentSize - bodyPosition;
107         if (avail < 0) {
108             LOG.trace("Not enough space for {} at {}", index, position);
109             return null;
110         }
111
112         final var writeLimit = Math.min(avail, maxEntrySize);
113         final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
114         try {
115             namespace.serialize(entry, diskEntry);
116         } catch (KryoException e) {
117             if (writeLimit != maxEntrySize) {
118                 // We have not provided enough capacity, signal to roll to next segment
119                 LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
120                 return null;
121             }
122
123             // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
124             throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
125         }
126
127         final int length = diskEntry.position() - HEADER_BYTES;
128
129         // Compute the checksum for the entry.
130         final var crc32 = new CRC32();
131         crc32.update(diskEntry.flip().position(HEADER_BYTES));
132
133         // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
134         diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
135         commitWrite(position, diskEntry.rewind());
136
137         // Update the last entry with the correct index/term/length.
138         final var indexedEntry = new Indexed<E>(index, entry, length);
139         lastEntry = indexedEntry;
140         this.index.index(index, position);
141
142         currentPosition = bodyPosition + length;
143
144         @SuppressWarnings("unchecked")
145         final var ugly = (Indexed<T>) indexedEntry;
146         return ugly;
147     }
148
149     abstract ByteBuffer startWrite(int position, int size);
150
151     abstract void commitWrite(int position, ByteBuffer entry);
152
153     /**
154      * Resets the head of the segment to the given index.
155      *
156      * @param index the index to which to reset the head of the segment
157      */
158     final void reset(final long index) {
159         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
160         final var reader = reader();
161         reader.invalidateCache();
162         try {
163             resetWithBuffer(reader, index);
164         } finally {
165             // Make sure reader does not see anything we've done
166             reader.invalidateCache();
167         }
168     }
169
170     abstract JournalSegmentReader<E> reader();
171
172     private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
173         long nextIndex = segment.firstIndex();
174
175         // Clear the buffer indexes and acquire ownership of the buffer
176         currentPosition = JournalSegmentDescriptor.BYTES;
177         reader.setPosition(JournalSegmentDescriptor.BYTES);
178
179         while (index == 0 || nextIndex <= index) {
180             final var entry = reader.readEntry(nextIndex);
181             if (entry == null) {
182                 break;
183             }
184
185             lastEntry = entry;
186             this.index.index(nextIndex, currentPosition);
187             nextIndex++;
188
189             // Update the current position for indexing.
190             currentPosition = currentPosition + HEADER_BYTES + entry.size();
191         }
192     }
193
194     /**
195      * Truncates the log to the given index.
196      *
197      * @param index The index to which to truncate the log.
198      */
199     final void truncate(final long index) {
200         // If the index is greater than or equal to the last index, skip the truncate.
201         if (index >= getLastIndex()) {
202             return;
203         }
204
205         // Reset the last entry.
206         lastEntry = null;
207
208         // Truncate the index.
209         this.index.truncate(index);
210
211         if (index < segment.firstIndex()) {
212             // Reset the writer to the first entry.
213             currentPosition = JournalSegmentDescriptor.BYTES;
214         } else {
215             // Reset the writer to the given index.
216             reset(index);
217         }
218
219         // Zero the entry header at current channel position.
220         writeEmptyHeader(currentPosition);
221     }
222
223     /**
224      * Write {@link SegmentEntry#HEADER_BYTES} worth of zeroes at specified position.
225      *
226      * @param position position to write to
227      */
228     abstract void writeEmptyHeader(int position);
229
230     /**
231      * Flushes written entries to disk.
232      */
233     abstract void flush();
234
235     /**
236      * Closes this writer.
237      */
238     abstract void close();
239
240     /**
241      * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
242      * buffer.
243      *
244      * @return the mapped buffer underlying the segment writer, or {@code null}.
245      */
246     abstract @Nullable MappedByteBuffer buffer();
247
248     abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
249
250     abstract @NonNull DiskJournalSegmentWriter<E> toFileChannel();
251 }