2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
21 import io.atomix.storage.journal.index.JournalIndex;
22 import io.netty.buffer.ByteBuf;
23 import java.nio.MappedByteBuffer;
24 import java.util.zip.CRC32;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 final class JournalSegmentWriter {
31 private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
33 private final FileWriter fileWriter;
34 final @NonNull JournalSegment segment;
35 private final @NonNull JournalIndex index;
36 final int maxSegmentSize;
37 final int maxEntrySize;
39 private int currentPosition;
40 private Long lastIndex;
42 JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
43 final JournalIndex index) {
44 this.fileWriter = requireNonNull(fileWriter);
45 this.segment = requireNonNull(segment);
46 this.index = requireNonNull(index);
47 maxSegmentSize = segment.file().maxSize();
48 this.maxEntrySize = maxEntrySize;
49 // adjust lastEntry value
53 JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
54 segment = previous.segment;
55 index = previous.index;
56 maxSegmentSize = previous.maxSegmentSize;
57 maxEntrySize = previous.maxEntrySize;
58 lastIndex = previous.lastIndex;
59 currentPosition = previous.currentPosition;
60 this.fileWriter = requireNonNull(fileWriter);
64 * Returns the last written index.
66 * @return The last written index.
69 return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
73 * Returns the next index to be written.
75 * @return The next index to be written.
78 return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
82 * Tries to append a binary data to the journal.
84 * @param buf binary data to append
85 * @return The index of appended data, or {@code null} if segment has no space
87 Long append(final ByteBuf buf) {
88 final var length = buf.readableBytes();
89 if (length > maxEntrySize) {
90 throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
91 + maxEntrySize + ")");
94 // Store the entry index.
95 final long index = getNextIndex();
96 final int position = currentPosition;
98 // check space available
99 final int nextPosition = position + HEADER_BYTES + length;
100 if (nextPosition >= maxSegmentSize) {
101 LOG.trace("Not enough space for {} at {}", index, position);
105 // allocate buffer and write data
106 final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
107 writeBuffer.put(buf.nioBuffer());
109 // Compute the checksum for the entry.
110 final var crc32 = new CRC32();
111 crc32.update(writeBuffer.flip().position(HEADER_BYTES));
113 // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
114 writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
115 fileWriter.commitWrite(position, writeBuffer.rewind());
117 // Update the last entry with the correct index/term/length.
118 currentPosition = nextPosition;
120 this.index.index(index, position);
126 * Resets the head of the segment to the given index.
128 * @param index the index to which to reset the head of the segment
130 void reset(final long index) {
131 // acquire ownership of cache and make sure reader does not see anything we've done once we're done
132 final var fileReader = fileWriter.reader();
134 resetWithBuffer(fileReader, index);
136 // Make sure reader does not see anything we've done
137 fileReader.invalidateCache();
141 private void resetWithBuffer(final FileReader fileReader, final long index) {
142 long nextIndex = segment.firstIndex();
144 // Clear the buffer indexes and acquire ownership of the buffer
145 currentPosition = JournalSegmentDescriptor.BYTES;
146 final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
147 reader.setPosition(JournalSegmentDescriptor.BYTES);
149 while (index == 0 || nextIndex <= index) {
150 final var buf = reader.readBytes(nextIndex);
155 lastIndex = nextIndex;
156 this.index.index(nextIndex, currentPosition);
159 // Update the current position for indexing.
160 currentPosition += HEADER_BYTES + buf.readableBytes();
165 * Truncates the log to the given index.
167 * @param index The index to which to truncate the log.
169 void truncate(final long index) {
170 // If the index is greater than or equal to the last index, skip the truncate.
171 if (index >= getLastIndex()) {
175 // Reset the last written
178 // Truncate the index.
179 this.index.truncate(index);
181 if (index < segment.firstIndex()) {
182 // Reset the writer to the first entry.
183 currentPosition = JournalSegmentDescriptor.BYTES;
185 // Reset the writer to the given index.
189 // Zero the entry header at current channel position.
190 fileWriter.writeEmptyHeader(currentPosition);
194 * Flushes written entries to disk.
201 * Closes this writer.
208 * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
211 * @return the mapped buffer underlying the segment writer, or {@code null}.
213 @Nullable MappedByteBuffer buffer() {
214 return fileWriter.buffer();
217 @NonNull JournalSegmentWriter toMapped() {
218 final var newWriter = fileWriter.toMapped();
219 return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
222 @NonNull JournalSegmentWriter toFileChannel() {
223 final var newWriter = fileWriter.toDisk();
224 return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);