Centralize CRC32 computation
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
20
21 import io.atomix.storage.journal.index.JournalIndex;
22 import io.netty.buffer.ByteBuf;
23 import java.nio.MappedByteBuffer;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 final class JournalSegmentWriter {
30     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
31
32     private final FileWriter fileWriter;
33     final @NonNull JournalSegment segment;
34     private final @NonNull JournalIndex index;
35     final int maxSegmentSize;
36     final int maxEntrySize;
37
38     private int currentPosition;
39     private Long lastIndex;
40
41     JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
42             final JournalIndex index) {
43         this.fileWriter = requireNonNull(fileWriter);
44         this.segment = requireNonNull(segment);
45         this.index = requireNonNull(index);
46         maxSegmentSize = segment.file().maxSize();
47         this.maxEntrySize = maxEntrySize;
48         // adjust lastEntry value
49         reset(0);
50     }
51
52     JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
53         segment = previous.segment;
54         index = previous.index;
55         maxSegmentSize = previous.maxSegmentSize;
56         maxEntrySize = previous.maxEntrySize;
57         lastIndex = previous.lastIndex;
58         currentPosition = previous.currentPosition;
59         this.fileWriter = requireNonNull(fileWriter);
60     }
61
62     /**
63      * Returns the last written index.
64      *
65      * @return The last written index.
66      */
67     long getLastIndex() {
68         return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
69     }
70
71     /**
72      * Returns the next index to be written.
73      *
74      * @return The next index to be written.
75      */
76     long getNextIndex() {
77         return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
78     }
79
80     /**
81      * Tries to append a binary data to the journal.
82      *
83      * @param buf binary data to append
84      * @return The index of appended data, or {@code null} if segment has no space
85      */
86     Long append(final ByteBuf buf) {
87         final var length = buf.readableBytes();
88         if (length > maxEntrySize) {
89             throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
90                 + maxEntrySize + ")");
91         }
92
93         // Store the entry index.
94         final long index = getNextIndex();
95         final int position = currentPosition;
96
97         // check space available
98         final int nextPosition = position + HEADER_BYTES + length;
99         if (nextPosition >= maxSegmentSize) {
100             LOG.trace("Not enough space for {} at {}", index, position);
101             return null;
102         }
103
104         // allocate buffer and write data
105         final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
106         writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
107
108         // Compute the checksum for the entry.
109         final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
110
111         // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
112         fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
113
114         // Update the last entry with the correct index/term/length.
115         currentPosition = nextPosition;
116         lastIndex = index;
117         this.index.index(index, position);
118
119         return index;
120     }
121
122     /**
123      * Resets the head of the segment to the given index.
124      *
125      * @param index the index to which to reset the head of the segment
126      */
127     void reset(final long index) {
128         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
129         final var fileReader = fileWriter.reader();
130         try {
131             resetWithBuffer(fileReader, index);
132         } finally {
133             // Make sure reader does not see anything we've done
134             fileReader.invalidateCache();
135         }
136     }
137
138     private void resetWithBuffer(final FileReader fileReader, final long index) {
139         long nextIndex = segment.firstIndex();
140
141         // Clear the buffer indexes and acquire ownership of the buffer
142         currentPosition = JournalSegmentDescriptor.BYTES;
143         final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
144         reader.setPosition(JournalSegmentDescriptor.BYTES);
145
146         while (index == 0 || nextIndex <= index) {
147             final var buf = reader.readBytes(nextIndex);
148             if (buf == null) {
149                 break;
150             }
151
152             lastIndex = nextIndex;
153             this.index.index(nextIndex, currentPosition);
154             nextIndex++;
155
156             // Update the current position for indexing.
157             currentPosition += HEADER_BYTES + buf.readableBytes();
158         }
159     }
160
161     /**
162      * Truncates the log to the given index.
163      *
164      * @param index The index to which to truncate the log.
165      */
166     void truncate(final long index) {
167         // If the index is greater than or equal to the last index, skip the truncate.
168         if (index >= getLastIndex()) {
169             return;
170         }
171
172         // Reset the last written
173         lastIndex = null;
174
175         // Truncate the index.
176         this.index.truncate(index);
177
178         if (index < segment.firstIndex()) {
179             // Reset the writer to the first entry.
180             currentPosition = JournalSegmentDescriptor.BYTES;
181         } else {
182             // Reset the writer to the given index.
183             reset(index);
184         }
185
186         // Zero the entry header at current channel position.
187         fileWriter.writeEmptyHeader(currentPosition);
188     }
189
190     /**
191      * Flushes written entries to disk.
192      */
193     void flush() {
194         fileWriter.flush();
195     }
196
197     /**
198      * Closes this writer.
199      */
200     void close() {
201         fileWriter.close();
202     }
203
204     /**
205      * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
206      * buffer.
207      *
208      * @return the mapped buffer underlying the segment writer, or {@code null}.
209      */
210     @Nullable MappedByteBuffer buffer() {
211         return fileWriter.buffer();
212     }
213
214     @NonNull JournalSegmentWriter toMapped() {
215         final var newWriter = fileWriter.toMapped();
216         return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
217     }
218
219     @NonNull JournalSegmentWriter toFileChannel() {
220         final var newWriter = fileWriter.toDisk();
221         return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
222     }
223 }