Expand JournalSegmentFile semantics
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
20
21 import io.atomix.storage.journal.index.JournalIndex;
22 import io.netty.buffer.ByteBuf;
23 import java.nio.MappedByteBuffer;
24 import java.util.zip.CRC32;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 final class JournalSegmentWriter {
31     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
32
33     private final FileWriter fileWriter;
34     final @NonNull JournalSegment segment;
35     private final @NonNull JournalIndex index;
36     final int maxSegmentSize;
37     final int maxEntrySize;
38
39     private int currentPosition;
40     private Long lastIndex;
41
42     JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
43             final JournalIndex index) {
44         this.fileWriter = requireNonNull(fileWriter);
45         this.segment = requireNonNull(segment);
46         this.index = requireNonNull(index);
47         maxSegmentSize = segment.file().maxSize();
48         this.maxEntrySize = maxEntrySize;
49         // adjust lastEntry value
50         reset(0);
51     }
52
53     JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
54         segment = previous.segment;
55         index = previous.index;
56         maxSegmentSize = previous.maxSegmentSize;
57         maxEntrySize = previous.maxEntrySize;
58         lastIndex = previous.lastIndex;
59         currentPosition = previous.currentPosition;
60         this.fileWriter = requireNonNull(fileWriter);
61     }
62
63     /**
64      * Returns the last written index.
65      *
66      * @return The last written index.
67      */
68     long getLastIndex() {
69         return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
70     }
71
72     /**
73      * Returns the next index to be written.
74      *
75      * @return The next index to be written.
76      */
77     long getNextIndex() {
78         return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
79     }
80
81     /**
82      * Tries to append a binary data to the journal.
83      *
84      * @param buf binary data to append
85      * @return The index of appended data, or {@code null} if segment has no space
86      */
87     Long append(final ByteBuf buf) {
88         final var length = buf.readableBytes();
89         if (length > maxEntrySize) {
90             throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
91                 + maxEntrySize + ")");
92         }
93
94         // Store the entry index.
95         final long index = getNextIndex();
96         final int position = currentPosition;
97
98         // check space available
99         final int nextPosition = position + HEADER_BYTES + length;
100         if (nextPosition >= maxSegmentSize) {
101             LOG.trace("Not enough space for {} at {}", index, position);
102             return null;
103         }
104
105         // allocate buffer and write data
106         final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
107         writeBuffer.put(buf.nioBuffer());
108
109         // Compute the checksum for the entry.
110         final var crc32 = new CRC32();
111         crc32.update(writeBuffer.flip().position(HEADER_BYTES));
112
113         // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
114         writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
115         fileWriter.commitWrite(position, writeBuffer.rewind());
116
117         // Update the last entry with the correct index/term/length.
118         currentPosition = nextPosition;
119         lastIndex = index;
120         this.index.index(index, position);
121
122         return index;
123     }
124
125     /**
126      * Resets the head of the segment to the given index.
127      *
128      * @param index the index to which to reset the head of the segment
129      */
130     void reset(final long index) {
131         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
132         final var fileReader = fileWriter.reader();
133         try {
134             resetWithBuffer(fileReader, index);
135         } finally {
136             // Make sure reader does not see anything we've done
137             fileReader.invalidateCache();
138         }
139     }
140
141     private void resetWithBuffer(final FileReader fileReader, final long index) {
142         long nextIndex = segment.firstIndex();
143
144         // Clear the buffer indexes and acquire ownership of the buffer
145         currentPosition = JournalSegmentDescriptor.BYTES;
146         final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
147         reader.setPosition(JournalSegmentDescriptor.BYTES);
148
149         while (index == 0 || nextIndex <= index) {
150             final var buf = reader.readBytes(nextIndex);
151             if (buf == null) {
152                 break;
153             }
154
155             lastIndex = nextIndex;
156             this.index.index(nextIndex, currentPosition);
157             nextIndex++;
158
159             // Update the current position for indexing.
160             currentPosition += HEADER_BYTES + buf.readableBytes();
161         }
162     }
163
164     /**
165      * Truncates the log to the given index.
166      *
167      * @param index The index to which to truncate the log.
168      */
169     void truncate(final long index) {
170         // If the index is greater than or equal to the last index, skip the truncate.
171         if (index >= getLastIndex()) {
172             return;
173         }
174
175         // Reset the last written
176         lastIndex = null;
177
178         // Truncate the index.
179         this.index.truncate(index);
180
181         if (index < segment.firstIndex()) {
182             // Reset the writer to the first entry.
183             currentPosition = JournalSegmentDescriptor.BYTES;
184         } else {
185             // Reset the writer to the given index.
186             reset(index);
187         }
188
189         // Zero the entry header at current channel position.
190         fileWriter.writeEmptyHeader(currentPosition);
191     }
192
193     /**
194      * Flushes written entries to disk.
195      */
196     void flush() {
197         fileWriter.flush();
198     }
199
200     /**
201      * Closes this writer.
202      */
203     void close() {
204         fileWriter.close();
205     }
206
207     /**
208      * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
209      * buffer.
210      *
211      * @return the mapped buffer underlying the segment writer, or {@code null}.
212      */
213     @Nullable MappedByteBuffer buffer() {
214         return fileWriter.buffer();
215     }
216
217     @NonNull JournalSegmentWriter toMapped() {
218         final var newWriter = fileWriter.toMapped();
219         return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
220     }
221
222     @NonNull JournalSegmentWriter toFileChannel() {
223         final var newWriter = fileWriter.toDisk();
224         return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
225     }
226 }