2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
21 import io.atomix.storage.journal.StorageException.TooLarge;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.atomix.storage.journal.index.Position;
24 import io.netty.buffer.ByteBuf;
25 import java.nio.MappedByteBuffer;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 final class JournalSegmentWriter {
32 private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
34 private final FileWriter fileWriter;
35 final @NonNull JournalSegment segment;
36 private final @NonNull JournalIndex journalIndex;
37 final int maxSegmentSize;
38 final int maxEntrySize;
40 private int currentPosition;
42 JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
43 final JournalIndex journalIndex) {
44 this.fileWriter = requireNonNull(fileWriter);
45 this.segment = requireNonNull(segment);
46 this.journalIndex = requireNonNull(journalIndex);
47 maxSegmentSize = segment.file().maxSize();
48 this.maxEntrySize = maxEntrySize;
50 // recover currentPosition and lastIndex
51 reset(Long.MAX_VALUE, null);
54 JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
55 segment = previous.segment;
56 journalIndex = previous.journalIndex;
57 maxSegmentSize = previous.maxSegmentSize;
58 maxEntrySize = previous.maxEntrySize;
59 currentPosition = previous.currentPosition;
60 this.fileWriter = requireNonNull(fileWriter);
64 * Returns the next index to be written.
66 * @return The next index to be written.
69 final var lastPosition = journalIndex.last();
70 return lastPosition != null ? lastPosition.index() + 1 : segment.firstIndex();
74 * Tries to append a binary data to the journal.
76 * @param buf binary data to append
77 * @return The index of appended data, or {@code null} if segment has no space
79 Position append(final ByteBuf buf) {
80 final var length = buf.readableBytes();
81 if (length > maxEntrySize) {
82 throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
85 // Store the entry index.
86 final long index = nextIndex();
87 final int position = currentPosition;
89 // check space available
90 final int nextPosition = position + HEADER_BYTES + length;
91 if (nextPosition >= maxSegmentSize) {
92 LOG.trace("Not enough space for {} at {}", index, position);
96 // allocate buffer and write data
97 final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
98 writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
100 // Compute the checksum for the entry.
101 final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
103 // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
104 fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
106 // Update the last entry with the correct index/term/length.
107 currentPosition = nextPosition;
108 return journalIndex.index(index, position);
112 * Truncates the log to the given index.
114 * @param index The index to which to truncate the log.
116 void truncate(final long index) {
117 // If the index is greater than or equal to the last index, skip the truncate.
118 if (index >= segment.lastIndex()) {
122 // Truncate the index, find nearest indexed entry
123 final var nearest = journalIndex.truncate(index);
125 // recover position and last written
126 if (index >= segment.firstIndex()) {
127 reset(index, nearest);
129 currentPosition = JournalSegmentDescriptor.BYTES;
132 // Zero the entry header at current channel position.
133 fileWriter.writeEmptyHeader(currentPosition);
136 private void reset(final long maxNextIndex, final @Nullable Position position) {
137 // acquire ownership of cache and make sure reader does not see anything we've done once we're done
138 final var fileReader = fileWriter.reader();
140 reset(fileReader, maxNextIndex, position);
142 // Make sure reader does not see anything we've done
143 fileReader.invalidateCache();
147 private void reset(final FileReader fileReader, final long maxNextIndex, final @Nullable Position position) {
149 if (position != null) {
150 // look from nearest recovered index
151 nextIndex = position.index();
152 currentPosition = position.position();
154 // look from very beginning of the segment
155 nextIndex = segment.firstIndex();
156 currentPosition = JournalSegmentDescriptor.BYTES;
159 final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
160 reader.setPosition(currentPosition);
162 while (nextIndex <= maxNextIndex) {
163 final var buf = reader.readBytes();
168 journalIndex.index(nextIndex++, currentPosition);
169 // Update the current position for indexing.
170 currentPosition += HEADER_BYTES + buf.readableBytes();
175 * Flushes written entries to disk.
182 * Closes this writer.
189 * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
192 * @return the mapped buffer underlying the segment writer, or {@code null}.
194 @Nullable MappedByteBuffer buffer() {
195 return fileWriter.buffer();
198 @NonNull JournalSegmentWriter toMapped() {
199 final var newWriter = fileWriter.toMapped();
200 return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
203 @NonNull JournalSegmentWriter toFileChannel() {
204 final var newWriter = fileWriter.toDisk();
205 return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);