2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
21 import com.google.common.base.MoreObjects;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.atomix.storage.journal.index.Position;
24 import io.atomix.storage.journal.index.SparseJournalIndex;
25 import java.io.IOException;
26 import java.nio.channels.FileChannel;
27 import java.nio.file.Files;
28 import java.nio.file.StandardOpenOption;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import org.eclipse.jdt.annotation.Nullable;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
39 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
41 final class JournalSegment {
42 private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
44 private final JournalSegmentFile file;
45 private final StorageLevel storageLevel;
46 private final int maxEntrySize;
47 private final JournalIndex journalIndex;
48 private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
49 private final AtomicInteger references = new AtomicInteger();
50 private final FileChannel channel;
52 private JournalSegmentWriter writer;
53 private boolean open = true;
56 final JournalSegmentFile file,
57 final StorageLevel storageLevel,
58 final int maxEntrySize,
59 final double indexDensity) {
60 this.file = requireNonNull(file);
61 this.storageLevel = requireNonNull(storageLevel);
62 this.maxEntrySize = maxEntrySize;
63 journalIndex = new SparseJournalIndex(indexDensity);
65 channel = FileChannel.open(file.path(),
66 StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
67 } catch (IOException e) {
68 throw new StorageException(e);
71 final var fileWriter = switch (storageLevel) {
72 case DISK -> new DiskFileWriter(file, channel, maxEntrySize);
73 case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize);
75 writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
76 // relinquish mapped memory
81 * Returns the segment's starting index.
83 * @return The segment's starting index.
86 return file.descriptor().index();
90 * Returns the last index in the segment.
92 * @return The last index in the segment.
95 return writer.getLastIndex();
99 * Returns the size of the segment.
101 * @return the size of the segment
105 return (int) channel.size();
106 } catch (IOException e) {
107 throw new StorageException(e);
112 * Returns the segment file.
114 * @return The segment file.
116 JournalSegmentFile file() {
121 * Looks up the position of the given index.
123 * @param index the index to lookup
124 * @return the position of the given index or a lesser index, or {@code null}
126 @Nullable Position lookup(final long index) {
127 return journalIndex.lookup(index);
131 * Acquires a reference to the log segment.
133 private void acquire() {
134 if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
135 writer = writer.toMapped();
140 * Releases a reference to the log segment.
142 private void release() {
143 if (references.decrementAndGet() == 0) {
144 if (storageLevel == StorageLevel.MAPPED) {
145 writer = writer.toFileChannel();
154 * Acquires a reference to the segment writer.
156 * @return The segment writer.
158 JournalSegmentWriter acquireWriter() {
166 * Releases the reference to the segment writer.
168 void releaseWriter() {
173 * Creates a new segment reader.
175 * @return A new segment reader.
177 JournalSegmentReader createReader() {
181 final var buffer = writer.buffer();
182 final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
183 : new DiskFileReader(file, channel, maxEntrySize);
184 final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
185 reader.setPosition(JournalSegmentDescriptor.BYTES);
191 * Closes a segment reader.
193 * @param reader the closed segment reader
195 void closeReader(final JournalSegmentReader reader) {
196 if (readers.remove(reader)) {
202 * Checks whether the segment is open.
204 private void checkOpen() {
206 throw new IllegalStateException("Segment not open");
211 * Returns a boolean indicating whether the segment is open.
213 * @return indicates whether the segment is open
220 * Closes the segment.
227 LOG.debug("Closing segment: {}", this);
229 readers.forEach(JournalSegmentReader::close);
230 if (references.get() == 0) {
235 private void finishClose() {
239 } catch (IOException e) {
240 throw new StorageException(e);
245 * Deletes the segment.
249 LOG.debug("Deleting segment: {}", this);
251 Files.deleteIfExists(file.path());
252 } catch (IOException e) {
253 throw new StorageException(e);
258 public String toString() {
259 final var descriptor = file.descriptor();
260 return MoreObjects.toStringHelper(this)
261 .add("id", descriptor.id())
262 .add("version", descriptor.version())
263 .add("index", descriptor.index())