2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
21 import io.atomix.storage.journal.index.JournalIndex;
22 import io.netty.buffer.ByteBuf;
23 import java.nio.MappedByteBuffer;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 final class JournalSegmentWriter {
30 private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
32 private final FileWriter fileWriter;
33 final @NonNull JournalSegment segment;
34 private final @NonNull JournalIndex index;
35 final int maxSegmentSize;
36 final int maxEntrySize;
38 private int currentPosition;
39 private Long lastIndex;
41 JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
42 final JournalIndex index) {
43 this.fileWriter = requireNonNull(fileWriter);
44 this.segment = requireNonNull(segment);
45 this.index = requireNonNull(index);
46 maxSegmentSize = segment.file().maxSize();
47 this.maxEntrySize = maxEntrySize;
48 // adjust lastEntry value
52 JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
53 segment = previous.segment;
54 index = previous.index;
55 maxSegmentSize = previous.maxSegmentSize;
56 maxEntrySize = previous.maxEntrySize;
57 lastIndex = previous.lastIndex;
58 currentPosition = previous.currentPosition;
59 this.fileWriter = requireNonNull(fileWriter);
63 * Returns the last written index.
65 * @return The last written index.
68 return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
72 * Returns the next index to be written.
74 * @return The next index to be written.
77 return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
81 * Tries to append a binary data to the journal.
83 * @param buf binary data to append
84 * @return The index of appended data, or {@code null} if segment has no space
86 Long append(final ByteBuf buf) {
87 final var length = buf.readableBytes();
88 if (length > maxEntrySize) {
89 throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
90 + maxEntrySize + ")");
93 // Store the entry index.
94 final long index = getNextIndex();
95 final int position = currentPosition;
97 // check space available
98 final int nextPosition = position + HEADER_BYTES + length;
99 if (nextPosition >= maxSegmentSize) {
100 LOG.trace("Not enough space for {} at {}", index, position);
104 // allocate buffer and write data
105 final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
106 writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
108 // Compute the checksum for the entry.
109 final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
111 // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
112 fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
114 // Update the last entry with the correct index/term/length.
115 currentPosition = nextPosition;
117 this.index.index(index, position);
123 * Resets the head of the segment to the given index.
125 * @param index the index to which to reset the head of the segment
127 void reset(final long index) {
128 // acquire ownership of cache and make sure reader does not see anything we've done once we're done
129 final var fileReader = fileWriter.reader();
131 resetWithBuffer(fileReader, index);
133 // Make sure reader does not see anything we've done
134 fileReader.invalidateCache();
138 private void resetWithBuffer(final FileReader fileReader, final long index) {
139 long nextIndex = segment.firstIndex();
141 // Clear the buffer indexes and acquire ownership of the buffer
142 currentPosition = JournalSegmentDescriptor.BYTES;
143 final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
144 reader.setPosition(JournalSegmentDescriptor.BYTES);
146 while (index == 0 || nextIndex <= index) {
147 final var buf = reader.readBytes();
152 lastIndex = nextIndex;
153 this.index.index(nextIndex, currentPosition);
156 // Update the current position for indexing.
157 currentPosition += HEADER_BYTES + buf.readableBytes();
162 * Truncates the log to the given index.
164 * @param index The index to which to truncate the log.
166 void truncate(final long index) {
167 // If the index is greater than or equal to the last index, skip the truncate.
168 if (index >= getLastIndex()) {
172 // Reset the last written
175 // Truncate the index.
176 this.index.truncate(index);
178 if (index < segment.firstIndex()) {
179 // Reset the writer to the first entry.
180 currentPosition = JournalSegmentDescriptor.BYTES;
182 // Reset the writer to the given index.
186 // Zero the entry header at current channel position.
187 fileWriter.writeEmptyHeader(currentPosition);
191 * Flushes written entries to disk.
198 * Closes this writer.
205 * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
208 * @return the mapped buffer underlying the segment writer, or {@code null}.
210 @Nullable MappedByteBuffer buffer() {
211 return fileWriter.buffer();
214 @NonNull JournalSegmentWriter toMapped() {
215 final var newWriter = fileWriter.toMapped();
216 return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
219 @NonNull JournalSegmentWriter toFileChannel() {
220 final var newWriter = fileWriter.toDisk();
221 return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);