2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
21 import io.atomix.storage.journal.index.JournalIndex;
22 import java.nio.MappedByteBuffer;
23 import java.nio.channels.FileChannel;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.eclipse.jdt.annotation.Nullable;
27 abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
28 final @NonNull FileChannel channel;
29 final @NonNull JournalSegment<E> segment;
30 final @NonNull JournalIndex index;
31 final @NonNull JournalSerdes namespace;
32 final int maxSegmentSize;
33 final int maxEntrySize;
34 final long firstIndex;
36 // FIXME: hide these two fields
40 JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
41 final JournalIndex index, final JournalSerdes namespace) {
42 this.channel = requireNonNull(channel);
43 this.segment = requireNonNull(segment);
44 this.index = requireNonNull(index);
45 this.namespace = requireNonNull(namespace);
46 maxSegmentSize = segment.descriptor().maxSegmentSize();
47 this.maxEntrySize = maxEntrySize;
48 firstIndex = segment.index();
51 JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
52 channel = previous.channel;
53 segment = previous.segment;
54 index = previous.index;
55 namespace = previous.namespace;
56 maxSegmentSize = previous.maxSegmentSize;
57 maxEntrySize = previous.maxEntrySize;
58 firstIndex = previous.firstIndex;
59 lastEntry = previous.lastEntry;
60 currentPosition = previous.currentPosition;
64 * Returns the last written index.
66 * @return The last written index.
68 final long getLastIndex() {
69 return lastEntry != null ? lastEntry.index() : firstIndex - 1;
73 * Returns the last entry written.
75 * @return The last entry written.
77 final Indexed<E> getLastEntry() {
82 * Returns the next index to be written.
84 * @return The next index to be written.
86 final long getNextIndex() {
87 return lastEntry != null ? lastEntry.index() + 1 : firstIndex;
91 * Appends an entry to the journal.
93 * @param entry The entry to append.
94 * @return The appended indexed entry.
96 abstract <T extends E> Indexed<T> append(T entry);
99 * Resets the head of the segment to the given index.
101 * @param index the index to which to reset the head of the segment
103 final void reset(final long index) {
104 // acquire ownership of cache and make sure reader does not see anything we've done once we're done
105 final var reader = reader();
106 reader.invalidateCache();
108 resetWithBuffer(reader, index);
110 // Make sure reader does not see anything we've done
111 reader.invalidateCache();
115 abstract JournalSegmentReader<E> reader();
117 private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
118 long nextIndex = firstIndex;
120 // Clear the buffer indexes and acquire ownership of the buffer
121 currentPosition = JournalSegmentDescriptor.BYTES;
122 reader.setPosition(JournalSegmentDescriptor.BYTES);
124 while (index == 0 || nextIndex <= index) {
125 final var entry = reader.readEntry(nextIndex);
131 this.index.index(nextIndex, currentPosition);
134 // Update the current position for indexing.
135 currentPosition = currentPosition + HEADER_BYTES + entry.size();
140 * Truncates the log to the given index.
142 * @param index The index to which to truncate the log.
144 abstract void truncate(long index);
147 * Flushes written entries to disk.
149 abstract void flush();
152 * Closes this writer.
154 abstract void close();
157 * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
160 * @return the mapped buffer underlying the segment writer, or {@code null}.
162 abstract @Nullable MappedByteBuffer buffer();
164 abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
166 abstract @NonNull DiskJournalSegmentWriter<E> toFileChannel();