Bump versions 9.0.4-SNAPSHOT
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
20 import static java.util.Objects.requireNonNull;
21
22 import com.google.common.base.MoreObjects;
23 import io.atomix.storage.journal.index.JournalIndex;
24 import io.atomix.storage.journal.index.Position;
25 import io.atomix.storage.journal.index.SparseJournalIndex;
26 import java.io.IOException;
27 import java.nio.file.Files;
28 import java.util.Set;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Log segment.
39  *
40  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
41  */
42 final class JournalSegment {
43     /**
44      * Encapsulation of a {@link JournalSegment}'s state.
45      */
46     sealed interface State {
47         // Marker interface
48     }
49
50     /**
51      * Journal segment is active, i.e. there is a associated with it.
52      */
53     @NonNullByDefault
54     record Active(FileAccess access, JournalSegmentWriter writer) implements State {
55         Active {
56             requireNonNull(access);
57             requireNonNull(writer);
58         }
59
60         Inactive deactivate() {
61             final var inactive = new Inactive(writer.currentPosition());
62             access.close();
63             return inactive;
64         }
65     }
66
67     /**
68      * Journal segment is inactive, i.e. there is no writer associated with it.
69      */
70     @NonNullByDefault
71     record Inactive(int position) implements State {
72         Active activate(final JournalSegment segment) throws IOException {
73             final var access = segment.file.newAccess(segment.storageLevel, segment.maxEntrySize);
74             return new Active(access, new JournalSegmentWriter(access.newFileWriter(), segment, segment.journalIndex,
75                 this));
76         }
77     }
78
79     private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
80
81     private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
82     private final AtomicInteger references = new AtomicInteger();
83     private final @NonNull JournalSegmentFile file;
84     private final @NonNull StorageLevel storageLevel;
85     private final @NonNull JournalIndex journalIndex;
86     private final int maxEntrySize;
87
88     private State state;
89     private boolean open = true;
90
91     JournalSegment(
92         final JournalSegmentFile file,
93         final StorageLevel storageLevel,
94         final int maxEntrySize,
95         final double indexDensity) {
96         this.file = requireNonNull(file);
97         this.storageLevel = requireNonNull(storageLevel);
98         this.maxEntrySize = maxEntrySize;
99
100         journalIndex = new SparseJournalIndex(indexDensity);
101
102         try (var tmpAccess = file.newAccess(storageLevel, maxEntrySize)) {
103             final var fileReader = tmpAccess.newFileReader();
104             state = new Inactive(indexEntries(fileReader, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null));
105         } catch (IOException e) {
106             throw new StorageException(e);
107         }
108     }
109
110     /**
111      * Returns the segment's starting index.
112      *
113      * @return The segment's starting index.
114      */
115     long firstIndex() {
116         return file.firstIndex();
117     }
118
119     /**
120      * Returns the last index in the segment.
121      *
122      * @return The last index in the segment.
123      */
124     long lastIndex() {
125         final var lastPosition = journalIndex.last();
126         return lastPosition != null ? lastPosition.index() : firstIndex() - 1;
127     }
128
129     /**
130      * Returns the segment file.
131      *
132      * @return The segment file.
133      */
134     JournalSegmentFile file() {
135         return file;
136     }
137
138     /**
139      * Looks up the position of the given index.
140      *
141      * @param index the index to lookup
142      * @return the position of the given index or a lesser index, or {@code null}
143      */
144     @Nullable Position lookup(final long index) {
145         return journalIndex.lookup(index);
146     }
147
148     /**
149      * Acquires a reference to the log segment.
150      */
151     private Active acquire() {
152         return references.getAndIncrement() == 0 ? activate() : (Active) state;
153     }
154
155     private Active activate() {
156         final Active ret;
157         try {
158             ret = ((Inactive) state).activate(this);
159         } catch (IOException e) {
160             throw new StorageException(e);
161         }
162         state = ret;
163         return ret;
164     }
165
166     /**
167      * Releases a reference to the log segment.
168      */
169     private void release() {
170         if (references.decrementAndGet() == 0) {
171             state = ((Active) state).deactivate();
172             if (!open) {
173                 finishClose();
174             }
175         }
176     }
177
178     /**
179      * Acquires a reference to the segment writer.
180      *
181      * @return The segment writer.
182      */
183     JournalSegmentWriter acquireWriter() {
184         checkOpen();
185         return acquire().writer();
186     }
187
188     /**
189      * Releases the reference to the segment writer.
190      */
191     void releaseWriter() {
192         release();
193     }
194
195     /**
196      * Creates a new segment reader.
197      *
198      * @return A new segment reader.
199      */
200     JournalSegmentReader createReader() {
201         checkOpen();
202
203         final var reader = new JournalSegmentReader(this, acquire().access().newFileReader(), maxEntrySize);
204         reader.setPosition(JournalSegmentDescriptor.BYTES);
205         readers.add(reader);
206         return reader;
207     }
208
209     /**
210      * Closes a segment reader.
211      *
212      * @param reader the closed segment reader
213      */
214     void closeReader(final JournalSegmentReader reader) {
215         if (readers.remove(reader)) {
216             release();
217         }
218     }
219
220     /**
221      * Checks whether the segment is open.
222      */
223     private void checkOpen() {
224         if (!open) {
225             throw new IllegalStateException("Segment not open");
226         }
227     }
228
229     /**
230      * Returns a boolean indicating whether the segment is open.
231      *
232      * @return indicates whether the segment is open
233      */
234     boolean isOpen() {
235         return open;
236     }
237
238     /**
239      * Closes the segment.
240      */
241     void close() {
242         if (!open) {
243             return;
244         }
245
246         LOG.debug("Closing segment: {}", this);
247         open = false;
248         readers.forEach(JournalSegmentReader::close);
249         if (references.get() == 0) {
250             finishClose();
251         }
252     }
253
254     private void finishClose() {
255         try {
256             file.close();
257         } catch (IOException e) {
258             throw new StorageException(e);
259         }
260     }
261
262     /**
263      * Deletes the segment.
264      */
265     void delete() {
266         close();
267         LOG.debug("Deleting segment: {}", this);
268         try {
269             Files.deleteIfExists(file.path());
270         } catch (IOException e) {
271             throw new StorageException(e);
272         }
273     }
274
275     @Override
276     public String toString() {
277         return MoreObjects.toStringHelper(this)
278             .add("id", file.segmentId())
279             .add("version", file.version())
280             .add("index", file.firstIndex())
281             .toString();
282     }
283
284     static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
285             final long maxNextIndex, final @Nullable Position start) {
286         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
287         final var fileReader = fileWriter.reader();
288         try {
289             return indexEntries(fileReader, segment, fileWriter.maxEntrySize(), journalIndex, maxNextIndex, start);
290         } finally {
291             // Make sure reader does not see anything we've done
292             fileReader.invalidateCache();
293         }
294     }
295
296     private static int indexEntries(final FileReader fileReader, final JournalSegment segment, final int maxEntrySize,
297             final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
298         int position;
299         long nextIndex;
300         if (start != null) {
301             // look from nearest recovered index
302             nextIndex = start.index();
303             position = start.position();
304         } else {
305             // look from very beginning of the segment
306             nextIndex = segment.firstIndex();
307             position = JournalSegmentDescriptor.BYTES;
308         }
309
310         final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
311         reader.setPosition(position);
312
313         while (nextIndex <= maxNextIndex) {
314             final var buf = reader.readBytes();
315             if (buf == null) {
316                 break;
317             }
318
319             journalIndex.index(nextIndex++, position);
320             // Update the current position for indexing.
321             position += HEADER_BYTES + buf.readableBytes();
322         }
323
324         return position;
325     }
326 }