2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
21 import com.google.common.base.MoreObjects;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.atomix.storage.journal.index.Position;
24 import io.atomix.storage.journal.index.SparseJournalIndex;
25 import java.io.IOException;
26 import java.nio.channels.FileChannel;
27 import java.nio.file.Files;
28 import java.nio.file.StandardOpenOption;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import org.eclipse.jdt.annotation.Nullable;
37 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
39 final class JournalSegment implements AutoCloseable {
40 private final JournalSegmentFile file;
41 private final StorageLevel storageLevel;
42 private final int maxEntrySize;
43 private final JournalIndex journalIndex;
44 private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
45 private final AtomicInteger references = new AtomicInteger();
46 private final FileChannel channel;
48 private JournalSegmentWriter writer;
49 private boolean open = true;
52 final JournalSegmentFile file,
53 final StorageLevel storageLevel,
54 final int maxEntrySize,
55 final double indexDensity) {
56 this.file = requireNonNull(file);
57 this.storageLevel = requireNonNull(storageLevel);
58 this.maxEntrySize = maxEntrySize;
59 journalIndex = new SparseJournalIndex(indexDensity);
61 channel = FileChannel.open(file.path(),
62 StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
63 } catch (IOException e) {
64 throw new StorageException(e);
67 final var fileWriter = switch (storageLevel) {
68 case DISK -> new DiskFileWriter(file, channel, maxEntrySize);
69 case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize);
71 writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
72 // relinquish mapped memory
77 * Returns the segment's starting index.
79 * @return The segment's starting index.
82 return file.descriptor().index();
86 * Returns the last index in the segment.
88 * @return The last index in the segment.
91 return writer.getLastIndex();
95 * Returns the size of the segment.
97 * @return the size of the segment
101 return (int) channel.size();
102 } catch (IOException e) {
103 throw new StorageException(e);
108 * Returns the segment file.
110 * @return The segment file.
112 JournalSegmentFile file() {
117 * Looks up the position of the given index.
119 * @param index the index to lookup
120 * @return the position of the given index or a lesser index, or {@code null}
122 @Nullable Position lookup(final long index) {
123 return journalIndex.lookup(index);
127 * Acquires a reference to the log segment.
129 private void acquire() {
130 if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
131 writer = writer.toMapped();
136 * Releases a reference to the log segment.
138 private void release() {
139 if (references.decrementAndGet() == 0) {
140 if (storageLevel == StorageLevel.MAPPED) {
141 writer = writer.toFileChannel();
150 * Acquires a reference to the segment writer.
152 * @return The segment writer.
154 JournalSegmentWriter acquireWriter() {
162 * Releases the reference to the segment writer.
164 void releaseWriter() {
169 * Creates a new segment reader.
171 * @return A new segment reader.
173 JournalSegmentReader createReader() {
177 final var buffer = writer.buffer();
178 final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
179 : new DiskFileReader(file, channel, maxEntrySize);
180 final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
181 reader.setPosition(JournalSegmentDescriptor.BYTES);
187 * Closes a segment reader.
189 * @param reader the closed segment reader
191 void closeReader(JournalSegmentReader reader) {
192 if (readers.remove(reader)) {
198 * Checks whether the segment is open.
200 private void checkOpen() {
202 throw new IllegalStateException("Segment not open");
207 * Returns a boolean indicating whether the segment is open.
209 * @return indicates whether the segment is open
211 public boolean isOpen() {
216 * Closes the segment.
219 public void close() {
225 readers.forEach(JournalSegmentReader::close);
226 if (references.get() == 0) {
231 private void finishClose() {
235 } catch (IOException e) {
236 throw new StorageException(e);
241 * Deletes the segment.
245 Files.deleteIfExists(file.path());
246 } catch (IOException e) {
247 throw new StorageException(e);
252 public String toString() {
253 final var descriptor = file.descriptor();
254 return MoreObjects.toStringHelper(this)
255 .add("id", descriptor.id())
256 .add("version", descriptor.version())
257 .add("index", descriptor.index())