2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
21 import io.atomix.storage.journal.StorageException.TooLarge;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.netty.buffer.Unpooled;
24 import java.io.IOException;
25 import java.nio.MappedByteBuffer;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 final class JournalSegmentWriter {
32 private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
34 private final FileWriter fileWriter;
35 final @NonNull JournalSegment segment;
36 private final @NonNull JournalIndex journalIndex;
37 final int maxSegmentSize;
38 final int maxEntrySize;
40 private int currentPosition;
42 JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
43 final JournalIndex journalIndex, final int currentPosition) {
44 this.fileWriter = requireNonNull(fileWriter);
45 this.segment = requireNonNull(segment);
46 this.journalIndex = requireNonNull(journalIndex);
47 this.maxEntrySize = maxEntrySize;
48 this.currentPosition = currentPosition;
49 maxSegmentSize = segment.file().maxSize();
52 JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
53 segment = previous.segment;
54 journalIndex = previous.journalIndex;
55 maxSegmentSize = previous.maxSegmentSize;
56 maxEntrySize = previous.maxEntrySize;
57 currentPosition = previous.currentPosition;
58 this.fileWriter = requireNonNull(fileWriter);
62 * Returns the next index to be written.
64 * @return The next index to be written.
67 final var lastPosition = journalIndex.last();
68 return lastPosition != null ? lastPosition.index() + 1 : segment.firstIndex();
72 * Tries to append a binary data to the journal.
74 * @param mapper the mapper to use
75 * @param entry the entry
76 * @return the entry size, or {@code null} if segment has no space
78 <T> @Nullable Integer append(final ByteBufMapper<T> mapper, final T entry) {
79 // we are appending at this index and position
80 final long index = nextIndex();
81 final int position = currentPosition;
83 // Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are
84 // way smaller than that.
85 final int bodyPosition = position + HEADER_BYTES;
86 final int avail = maxSegmentSize - bodyPosition;
88 // we do not have enough space for the header and a byte: signal a retry
89 LOG.trace("Not enough space for {} at {}", index, position);
93 // Entry must not exceed maxEntrySize
94 final var writeLimit = Math.min(avail, maxEntrySize);
96 // Allocate entry space
97 final var diskEntry = fileWriter.startWrite(position, writeLimit + HEADER_BYTES);
98 // Create a ByteBuf covering the bytes. Note we do not use slice(), as Netty will do the equivalent.
99 final var bytes = Unpooled.wrappedBuffer(diskEntry.position(HEADER_BYTES));
101 mapper.objectToBytes(entry, bytes);
102 } catch (IOException e) {
103 // We ran out of buffer space: let's decide who's fault it is:
104 if (writeLimit == maxEntrySize) {
105 // - it is the entry and/or mapper. This is not exactly accurate, as there may be other serialization
106 // fault. This is as good as it gets.
107 throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
110 // - it is us, as we do not have the capacity to hold maxEntrySize bytes
111 LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
115 // Determine length, trim distEntry and compute checksum. We are okay with computeChecksum() consuming
116 // the buffer, as we rewind() it back.
117 final var length = bytes.readableBytes();
118 final var checksum = SegmentEntry.computeChecksum(
119 diskEntry.limit(HEADER_BYTES + length).position(HEADER_BYTES));
121 // update the header and commit entry to file
122 fileWriter.commitWrite(position, diskEntry.rewind().putInt(0, length).putInt(Integer.BYTES, checksum));
124 // Update the last entry with the correct index/term/length.
125 currentPosition = bodyPosition + length;
126 journalIndex.index(index, position);
131 * Truncates the log to the given index.
133 * @param index The index to which to truncate the log.
135 void truncate(final long index) {
136 // If the index is greater than or equal to the last index, skip the truncate.
137 if (index >= segment.lastIndex()) {
141 // Truncate the index, find nearest indexed entry
142 final var nearest = journalIndex.truncate(index);
144 currentPosition = index < segment.firstIndex() ? JournalSegmentDescriptor.BYTES
145 // recover position and last written
146 : JournalSegment.indexEntries(fileWriter, segment, maxEntrySize, journalIndex, index, nearest);
148 // Zero the entry header at current channel position.
149 fileWriter.writeEmptyHeader(currentPosition);
153 * Flushes written entries to disk.
160 * Closes this writer.
167 * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
170 * @return the mapped buffer underlying the segment writer, or {@code null}.
172 @Nullable MappedByteBuffer buffer() {
173 return fileWriter.buffer();
176 @NonNull JournalSegmentWriter toMapped() {
177 final var newWriter = fileWriter.toMapped();
178 return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
181 @NonNull JournalSegmentWriter toFileChannel() {
182 final var newWriter = fileWriter.toDisk();
183 return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);