2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
21 import com.google.common.base.MoreObjects;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.atomix.storage.journal.index.Position;
24 import io.atomix.storage.journal.index.SparseJournalIndex;
25 import java.io.IOException;
26 import java.nio.file.Files;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
37 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
39 final class JournalSegment {
40 private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
42 private final JournalSegmentFile file;
43 private final StorageLevel storageLevel;
44 private final int maxEntrySize;
45 private final JournalIndex journalIndex;
46 private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
47 private final AtomicInteger references = new AtomicInteger();
49 private JournalSegmentWriter writer;
50 private boolean open = true;
53 final JournalSegmentFile file,
54 final StorageLevel storageLevel,
55 final int maxEntrySize,
56 final double indexDensity) {
57 this.file = requireNonNull(file);
58 this.storageLevel = requireNonNull(storageLevel);
59 this.maxEntrySize = maxEntrySize;
60 journalIndex = new SparseJournalIndex(indexDensity);
62 final var fileWriter = switch (storageLevel) {
63 case DISK -> new DiskFileWriter(file, maxEntrySize);
64 case MAPPED -> new MappedFileWriter(file, maxEntrySize);
66 writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
67 // relinquish mapped memory
72 * Returns the segment's starting index.
74 * @return The segment's starting index.
77 return file.firstIndex();
81 * Returns the last index in the segment.
83 * @return The last index in the segment.
86 return writer.getLastIndex();
90 * Returns the segment file.
92 * @return The segment file.
94 JournalSegmentFile file() {
99 * Looks up the position of the given index.
101 * @param index the index to lookup
102 * @return the position of the given index or a lesser index, or {@code null}
104 @Nullable Position lookup(final long index) {
105 return journalIndex.lookup(index);
109 * Acquires a reference to the log segment.
111 private void acquire() {
112 if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
113 writer = writer.toMapped();
118 * Releases a reference to the log segment.
120 private void release() {
121 if (references.decrementAndGet() == 0) {
122 if (storageLevel == StorageLevel.MAPPED) {
123 writer = writer.toFileChannel();
132 * Acquires a reference to the segment writer.
134 * @return The segment writer.
136 JournalSegmentWriter acquireWriter() {
144 * Releases the reference to the segment writer.
146 void releaseWriter() {
151 * Creates a new segment reader.
153 * @return A new segment reader.
155 JournalSegmentReader createReader() {
159 final var buffer = writer.buffer();
160 final var fileReader = buffer != null ? new MappedFileReader(file, buffer) : new DiskFileReader(file, maxEntrySize);
161 final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
162 reader.setPosition(JournalSegmentDescriptor.BYTES);
168 * Closes a segment reader.
170 * @param reader the closed segment reader
172 void closeReader(final JournalSegmentReader reader) {
173 if (readers.remove(reader)) {
179 * Checks whether the segment is open.
181 private void checkOpen() {
183 throw new IllegalStateException("Segment not open");
188 * Returns a boolean indicating whether the segment is open.
190 * @return indicates whether the segment is open
197 * Closes the segment.
204 LOG.debug("Closing segment: {}", this);
206 readers.forEach(JournalSegmentReader::close);
207 if (references.get() == 0) {
212 private void finishClose() {
216 } catch (IOException e) {
217 throw new StorageException(e);
222 * Deletes the segment.
226 LOG.debug("Deleting segment: {}", this);
228 Files.deleteIfExists(file.path());
229 } catch (IOException e) {
230 throw new StorageException(e);
235 public String toString() {
236 return MoreObjects.toStringHelper(this)
237 .add("id", file.segmentId())
238 .add("version", file.version())
239 .add("index", file.firstIndex())