Improve ByteBufMapper.objectToBytes() contract
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
20
21 import io.atomix.storage.journal.StorageException.TooLarge;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.netty.buffer.Unpooled;
24 import java.io.EOFException;
25 import java.io.IOException;
26 import java.nio.MappedByteBuffer;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 final class JournalSegmentWriter {
33     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
34
35     private final FileWriter fileWriter;
36     final @NonNull JournalSegment segment;
37     private final @NonNull JournalIndex journalIndex;
38     final int maxSegmentSize;
39     final int maxEntrySize;
40
41     private int currentPosition;
42
43     JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
44             final JournalIndex journalIndex, final int currentPosition) {
45         this.fileWriter = requireNonNull(fileWriter);
46         this.segment = requireNonNull(segment);
47         this.journalIndex = requireNonNull(journalIndex);
48         this.maxEntrySize = maxEntrySize;
49         this.currentPosition = currentPosition;
50         maxSegmentSize = segment.file().maxSize();
51     }
52
53     JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
54         segment = previous.segment;
55         journalIndex = previous.journalIndex;
56         maxSegmentSize = previous.maxSegmentSize;
57         maxEntrySize = previous.maxEntrySize;
58         currentPosition = previous.currentPosition;
59         this.fileWriter = requireNonNull(fileWriter);
60     }
61
62     /**
63      * Returns the next index to be written.
64      *
65      * @return The next index to be written.
66      */
67     long nextIndex() {
68         final var lastPosition = journalIndex.last();
69         return lastPosition != null ? lastPosition.index() + 1 : segment.firstIndex();
70     }
71
72     /**
73      * Tries to append a binary data to the journal.
74      *
75      * @param mapper the mapper to use
76      * @param entry the entry
77      * @return the entry size, or {@code null} if segment has no space
78      */
79     <T> @Nullable Integer append(final ByteBufMapper<T> mapper, final T entry) {
80         // we are appending at this index and position
81         final long index = nextIndex();
82         final int position = currentPosition;
83
84         // Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are
85         // way smaller than that.
86         final int bodyPosition = position + HEADER_BYTES;
87         final int avail = maxSegmentSize - bodyPosition;
88         if (avail <= 0) {
89             // we do not have enough space for the header and a byte: signal a retry
90             LOG.trace("Not enough space for {} at {}", index, position);
91             return null;
92         }
93
94         // Entry must not exceed maxEntrySize
95         final var writeLimit = Math.min(avail, maxEntrySize);
96
97         // Allocate entry space
98         final var diskEntry = fileWriter.startWrite(position, writeLimit + HEADER_BYTES);
99         // Create a ByteBuf covering the bytes. Note we do not use slice(), as Netty will do the equivalent.
100         final var bytes = Unpooled.wrappedBuffer(diskEntry.position(HEADER_BYTES));
101         try {
102             mapper.objectToBytes(entry, bytes);
103         } catch (EOFException e) {
104             // We ran out of buffer space: let's decide who's fault it is:
105             if (writeLimit == maxEntrySize) {
106                 // - it is the entry and/or mapper. This is not exactly accurate, as there may be other serialization
107                 //   fault. This is as good as it gets.
108                 throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
109             }
110
111             // - it is us, as we do not have the capacity to hold maxEntrySize bytes
112             LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
113             return null;
114         } catch (IOException e) {
115             throw new StorageException(e);
116         }
117
118         // Determine length, trim distEntry and compute checksum. We are okay with computeChecksum() consuming
119         // the buffer, as we rewind() it back.
120         final var length = bytes.readableBytes();
121         final var checksum = SegmentEntry.computeChecksum(
122             diskEntry.limit(HEADER_BYTES + length).position(HEADER_BYTES));
123
124         // update the header and commit entry to file
125         fileWriter.commitWrite(position, diskEntry.rewind().putInt(0, length).putInt(Integer.BYTES, checksum));
126
127         // Update the last entry with the correct index/term/length.
128         currentPosition = bodyPosition + length;
129         journalIndex.index(index, position);
130         return length;
131     }
132
133     /**
134      * Truncates the log to the given index.
135      *
136      * @param index The index to which to truncate the log.
137      */
138     void truncate(final long index) {
139         // If the index is greater than or equal to the last index, skip the truncate.
140         if (index >= segment.lastIndex()) {
141             return;
142         }
143
144         // Truncate the index, find nearest indexed entry
145         final var nearest = journalIndex.truncate(index);
146
147         currentPosition = index < segment.firstIndex() ? JournalSegmentDescriptor.BYTES
148             // recover position and last written
149             : JournalSegment.indexEntries(fileWriter, segment, maxEntrySize, journalIndex, index, nearest);
150
151         // Zero the entry header at current channel position.
152         fileWriter.writeEmptyHeader(currentPosition);
153     }
154
155     /**
156      * Flushes written entries to disk.
157      */
158     void flush() {
159         fileWriter.flush();
160     }
161
162     /**
163      * Closes this writer.
164      */
165     void close() {
166         fileWriter.close();
167     }
168
169     /**
170      * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
171      * buffer.
172      *
173      * @return the mapped buffer underlying the segment writer, or {@code null}.
174      */
175     @Nullable MappedByteBuffer buffer() {
176         return fileWriter.buffer();
177     }
178
179     @NonNull JournalSegmentWriter toMapped() {
180         final var newWriter = fileWriter.toMapped();
181         return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
182     }
183
184     @NonNull JournalSegmentWriter toFileChannel() {
185         final var newWriter = fileWriter.toDisk();
186         return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
187     }
188 }