2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
20 import static java.util.Objects.requireNonNull;
22 import com.google.common.base.MoreObjects;
23 import io.atomix.storage.journal.index.JournalIndex;
24 import io.atomix.storage.journal.index.Position;
25 import io.atomix.storage.journal.index.SparseJournalIndex;
26 import java.io.IOException;
27 import java.nio.file.Files;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
40 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
42 final class JournalSegment {
44 * Encapsulation of a {@link JournalSegment}'s state.
46 sealed interface State {
51 * Journal segment is active, i.e. there is a associated with it.
54 record Active(FileAccess access, JournalSegmentWriter writer) implements State {
56 requireNonNull(access);
57 requireNonNull(writer);
60 Inactive deactivate() {
61 final var inactive = new Inactive(writer.currentPosition());
68 * Journal segment is inactive, i.e. there is no writer associated with it.
71 record Inactive(int position) implements State {
72 Active activate(final JournalSegment segment) throws IOException {
73 final var access = segment.file.newAccess(segment.storageLevel, segment.maxEntrySize);
74 return new Active(access, new JournalSegmentWriter(access.newFileWriter(), segment, segment.journalIndex,
79 private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
81 private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
82 private final AtomicInteger references = new AtomicInteger();
83 private final @NonNull JournalSegmentFile file;
84 private final @NonNull StorageLevel storageLevel;
85 private final @NonNull JournalIndex journalIndex;
86 private final int maxEntrySize;
89 private boolean open = true;
92 final JournalSegmentFile file,
93 final StorageLevel storageLevel,
94 final int maxEntrySize,
95 final double indexDensity) {
96 this.file = requireNonNull(file);
97 this.storageLevel = requireNonNull(storageLevel);
98 this.maxEntrySize = maxEntrySize;
100 journalIndex = new SparseJournalIndex(indexDensity);
102 try (var tmpAccess = file.newAccess(storageLevel, maxEntrySize)) {
103 final var fileReader = tmpAccess.newFileReader();
104 state = new Inactive(indexEntries(fileReader, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null));
105 } catch (IOException e) {
106 throw new StorageException(e);
111 * Returns the segment's starting index.
113 * @return The segment's starting index.
116 return file.firstIndex();
120 * Returns the last index in the segment.
122 * @return The last index in the segment.
125 final var lastPosition = journalIndex.last();
126 return lastPosition != null ? lastPosition.index() : firstIndex() - 1;
130 * Returns the segment file.
132 * @return The segment file.
134 JournalSegmentFile file() {
139 * Looks up the position of the given index.
141 * @param index the index to lookup
142 * @return the position of the given index or a lesser index, or {@code null}
144 @Nullable Position lookup(final long index) {
145 return journalIndex.lookup(index);
149 * Acquires a reference to the log segment.
151 private Active acquire() {
152 return references.getAndIncrement() == 0 ? activate() : (Active) state;
155 private Active activate() {
158 ret = ((Inactive) state).activate(this);
159 } catch (IOException e) {
160 throw new StorageException(e);
167 * Releases a reference to the log segment.
169 private void release() {
170 if (references.decrementAndGet() == 0) {
171 state = ((Active) state).deactivate();
179 * Acquires a reference to the segment writer.
181 * @return The segment writer.
183 JournalSegmentWriter acquireWriter() {
185 return acquire().writer();
189 * Releases the reference to the segment writer.
191 void releaseWriter() {
196 * Creates a new segment reader.
198 * @return A new segment reader.
200 JournalSegmentReader createReader() {
203 final var reader = new JournalSegmentReader(this, acquire().access().newFileReader(), maxEntrySize);
204 reader.setPosition(JournalSegmentDescriptor.BYTES);
210 * Closes a segment reader.
212 * @param reader the closed segment reader
214 void closeReader(final JournalSegmentReader reader) {
215 if (readers.remove(reader)) {
221 * Checks whether the segment is open.
223 private void checkOpen() {
225 throw new IllegalStateException("Segment not open");
230 * Returns a boolean indicating whether the segment is open.
232 * @return indicates whether the segment is open
239 * Closes the segment.
246 LOG.debug("Closing segment: {}", this);
248 readers.forEach(JournalSegmentReader::close);
249 if (references.get() == 0) {
254 private void finishClose() {
257 } catch (IOException e) {
258 throw new StorageException(e);
263 * Deletes the segment.
267 LOG.debug("Deleting segment: {}", this);
269 Files.deleteIfExists(file.path());
270 } catch (IOException e) {
271 throw new StorageException(e);
276 public String toString() {
277 return MoreObjects.toStringHelper(this)
278 .add("id", file.segmentId())
279 .add("version", file.version())
280 .add("index", file.firstIndex())
284 static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
285 final long maxNextIndex, final @Nullable Position start) {
286 // acquire ownership of cache and make sure reader does not see anything we've done once we're done
287 final var fileReader = fileWriter.reader();
289 return indexEntries(fileReader, segment, fileWriter.maxEntrySize(), journalIndex, maxNextIndex, start);
291 // Make sure reader does not see anything we've done
292 fileReader.invalidateCache();
296 private static int indexEntries(final FileReader fileReader, final JournalSegment segment, final int maxEntrySize,
297 final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
301 // look from nearest recovered index
302 nextIndex = start.index();
303 position = start.position();
305 // look from very beginning of the segment
306 nextIndex = segment.firstIndex();
307 position = JournalSegmentDescriptor.BYTES;
310 final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
311 reader.setPosition(position);
313 while (nextIndex <= maxNextIndex) {
314 final var buf = reader.readBytes();
319 journalIndex.index(nextIndex++, position);
320 // Update the current position for indexing.
321 position += HEADER_BYTES + buf.readableBytes();