f9e13274059933aae38f2dc192e38fda93911ac1
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
20
21 import io.atomix.storage.journal.StorageException.TooLarge;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.atomix.storage.journal.index.Position;
24 import io.netty.buffer.ByteBuf;
25 import java.nio.MappedByteBuffer;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 final class JournalSegmentWriter {
32     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
33
34     private final FileWriter fileWriter;
35     final @NonNull JournalSegment segment;
36     private final @NonNull JournalIndex journalIndex;
37     final int maxSegmentSize;
38     final int maxEntrySize;
39
40     private int currentPosition;
41
42     JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
43             final JournalIndex journalIndex) {
44         this.fileWriter = requireNonNull(fileWriter);
45         this.segment = requireNonNull(segment);
46         this.journalIndex = requireNonNull(journalIndex);
47         maxSegmentSize = segment.file().maxSize();
48         this.maxEntrySize = maxEntrySize;
49
50         // recover currentPosition and lastIndex
51         reset(Long.MAX_VALUE, null);
52     }
53
54     JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
55         segment = previous.segment;
56         journalIndex = previous.journalIndex;
57         maxSegmentSize = previous.maxSegmentSize;
58         maxEntrySize = previous.maxEntrySize;
59         currentPosition = previous.currentPosition;
60         this.fileWriter = requireNonNull(fileWriter);
61     }
62
63     /**
64      * Returns the next index to be written.
65      *
66      * @return The next index to be written.
67      */
68     long nextIndex() {
69         final var lastPosition = journalIndex.last();
70         return lastPosition != null ? lastPosition.index() + 1 : segment.firstIndex();
71     }
72
73     /**
74      * Tries to append a binary data to the journal.
75      *
76      * @param buf binary data to append
77      * @return The index of appended data, or {@code null} if segment has no space
78      */
79     Position append(final ByteBuf buf) {
80         final var length = buf.readableBytes();
81         if (length > maxEntrySize) {
82             throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
83         }
84
85         // Store the entry index.
86         final long index = nextIndex();
87         final int position = currentPosition;
88
89         // check space available
90         final int nextPosition = position + HEADER_BYTES + length;
91         if (nextPosition >= maxSegmentSize) {
92             LOG.trace("Not enough space for {} at {}", index, position);
93             return null;
94         }
95
96         // allocate buffer and write data
97         final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
98         writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
99
100         // Compute the checksum for the entry.
101         final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
102
103         // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
104         fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
105
106         // Update the last entry with the correct index/term/length.
107         currentPosition = nextPosition;
108         return journalIndex.index(index, position);
109     }
110
111     /**
112      * Truncates the log to the given index.
113      *
114      * @param index The index to which to truncate the log.
115      */
116     void truncate(final long index) {
117         // If the index is greater than or equal to the last index, skip the truncate.
118         if (index >= segment.lastIndex()) {
119             return;
120         }
121
122         // Truncate the index, find nearest indexed entry
123         final var nearest = journalIndex.truncate(index);
124
125         // recover position and last written
126         if (index >= segment.firstIndex()) {
127             reset(index, nearest);
128         } else {
129             currentPosition = JournalSegmentDescriptor.BYTES;
130         }
131
132         // Zero the entry header at current channel position.
133         fileWriter.writeEmptyHeader(currentPosition);
134     }
135
136     private void reset(final long maxNextIndex, final @Nullable Position position) {
137         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
138         final var fileReader = fileWriter.reader();
139         try {
140             reset(fileReader, maxNextIndex, position);
141         } finally {
142             // Make sure reader does not see anything we've done
143             fileReader.invalidateCache();
144         }
145     }
146
147     private void reset(final FileReader fileReader, final long maxNextIndex, final @Nullable Position position) {
148         long nextIndex;
149         if (position != null) {
150             // look from nearest recovered index
151             nextIndex = position.index();
152             currentPosition = position.position();
153         } else {
154             // look from very beginning of the segment
155             nextIndex = segment.firstIndex();
156             currentPosition = JournalSegmentDescriptor.BYTES;
157         }
158
159         final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
160         reader.setPosition(currentPosition);
161
162         while (nextIndex <= maxNextIndex) {
163             final var buf = reader.readBytes();
164             if (buf == null) {
165                 break;
166             }
167
168             journalIndex.index(nextIndex++, currentPosition);
169             // Update the current position for indexing.
170             currentPosition += HEADER_BYTES + buf.readableBytes();
171         }
172     }
173
174     /**
175      * Flushes written entries to disk.
176      */
177     void flush() {
178         fileWriter.flush();
179     }
180
181     /**
182      * Closes this writer.
183      */
184     void close() {
185         fileWriter.close();
186     }
187
188     /**
189      * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
190      * buffer.
191      *
192      * @return the mapped buffer underlying the segment writer, or {@code null}.
193      */
194     @Nullable MappedByteBuffer buffer() {
195         return fileWriter.buffer();
196     }
197
198     @NonNull JournalSegmentWriter toMapped() {
199         final var newWriter = fileWriter.toMapped();
200         return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
201     }
202
203     @NonNull JournalSegmentWriter toFileChannel() {
204         final var newWriter = fileWriter.toDisk();
205         return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
206     }
207 }