f6a7e9498786d58ca7771d161d2706c506f252a6
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
19 import static java.util.Objects.requireNonNull;
20
21 import io.atomix.storage.journal.index.JournalIndex;
22 import java.nio.MappedByteBuffer;
23 import java.nio.channels.FileChannel;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.eclipse.jdt.annotation.Nullable;
26
27 abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
28     final @NonNull FileChannel channel;
29     final @NonNull JournalSegment<E> segment;
30     final @NonNull JournalIndex index;
31     final @NonNull JournalSerdes namespace;
32     final int maxSegmentSize;
33     final int maxEntrySize;
34     final long firstIndex;
35
36     // FIXME: hide these two fields
37     Indexed<E> lastEntry;
38     int currentPosition;
39
40     JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
41             final JournalIndex index, final JournalSerdes namespace) {
42         this.channel = requireNonNull(channel);
43         this.segment = requireNonNull(segment);
44         this.index = requireNonNull(index);
45         this.namespace = requireNonNull(namespace);
46         maxSegmentSize = segment.descriptor().maxSegmentSize();
47         this.maxEntrySize = maxEntrySize;
48         firstIndex = segment.index();
49     }
50
51     JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
52         channel = previous.channel;
53         segment = previous.segment;
54         index = previous.index;
55         namespace = previous.namespace;
56         maxSegmentSize = previous.maxSegmentSize;
57         maxEntrySize = previous.maxEntrySize;
58         firstIndex = previous.firstIndex;
59         lastEntry = previous.lastEntry;
60         currentPosition = previous.currentPosition;
61     }
62
63     /**
64      * Returns the last written index.
65      *
66      * @return The last written index.
67      */
68     final long getLastIndex() {
69         return lastEntry != null ? lastEntry.index() : firstIndex - 1;
70     }
71
72     /**
73      * Returns the last entry written.
74      *
75      * @return The last entry written.
76      */
77     final Indexed<E> getLastEntry() {
78         return lastEntry;
79     }
80
81     /**
82      * Returns the next index to be written.
83      *
84      * @return The next index to be written.
85      */
86     final long getNextIndex() {
87         return lastEntry != null ? lastEntry.index() + 1 : firstIndex;
88     }
89
90     /**
91      * Appends an entry to the journal.
92      *
93      * @param entry The entry to append.
94      * @return The appended indexed entry.
95      */
96     abstract <T extends E> Indexed<T> append(T entry);
97
98     /**
99      * Resets the head of the segment to the given index.
100      *
101      * @param index the index to which to reset the head of the segment
102      */
103     final void reset(final long index) {
104         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
105         final var reader = reader();
106         reader.invalidateCache();
107         try {
108             resetWithBuffer(reader, index);
109         } finally {
110             // Make sure reader does not see anything we've done
111             reader.invalidateCache();
112         }
113     }
114
115     abstract JournalSegmentReader<E> reader();
116
117     private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
118         long nextIndex = firstIndex;
119
120         // Clear the buffer indexes and acquire ownership of the buffer
121         currentPosition = JournalSegmentDescriptor.BYTES;
122         reader.setPosition(JournalSegmentDescriptor.BYTES);
123
124         while (index == 0 || nextIndex <= index) {
125             final var entry = reader.readEntry(nextIndex);
126             if (entry == null) {
127                 break;
128             }
129
130             lastEntry = entry;
131             this.index.index(nextIndex, currentPosition);
132             nextIndex++;
133
134             // Update the current position for indexing.
135             currentPosition = currentPosition + HEADER_BYTES + entry.size();
136         }
137     }
138
139     /**
140      * Truncates the log to the given index.
141      *
142      * @param index The index to which to truncate the log.
143      */
144     abstract void truncate(long index);
145
146     /**
147      * Flushes written entries to disk.
148      */
149     abstract void flush();
150
151     /**
152      * Closes this writer.
153      */
154     abstract void close();
155
156     /**
157      * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
158      * buffer.
159      *
160      * @return the mapped buffer underlying the segment writer, or {@code null}.
161      */
162     abstract @Nullable MappedByteBuffer buffer();
163
164     abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
165
166     abstract @NonNull DiskJournalSegmentWriter<E> toFileChannel();
167 }