2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import com.google.common.base.MoreObjects;
20 import io.atomix.storage.journal.index.JournalIndex;
21 import io.atomix.storage.journal.index.Position;
22 import io.atomix.storage.journal.index.SparseJournalIndex;
23 import java.io.IOException;
24 import java.nio.channels.FileChannel;
25 import java.nio.file.Files;
26 import java.nio.file.StandardOpenOption;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import org.eclipse.jdt.annotation.Nullable;
35 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
37 final class JournalSegment<E> implements AutoCloseable {
38 private final JournalSegmentFile file;
39 private final JournalSegmentDescriptor descriptor;
40 private final StorageLevel storageLevel;
41 private final int maxEntrySize;
42 private final JournalIndex journalIndex;
43 private final JournalSerdes namespace;
44 private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
45 private final AtomicInteger references = new AtomicInteger();
46 private final FileChannel channel;
48 private JournalSegmentWriter<E> writer;
49 private boolean open = true;
52 JournalSegmentFile file,
53 JournalSegmentDescriptor descriptor,
54 StorageLevel storageLevel,
57 JournalSerdes namespace) {
59 this.descriptor = descriptor;
60 this.storageLevel = storageLevel;
61 this.maxEntrySize = maxEntrySize;
62 this.namespace = namespace;
63 journalIndex = new SparseJournalIndex(indexDensity);
65 channel = FileChannel.open(file.file().toPath(),
66 StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
67 } catch (IOException e) {
68 throw new StorageException(e);
70 writer = switch (storageLevel) {
71 case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace);
72 case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace)
78 * Returns the segment's starting index.
80 * @return The segment's starting index.
83 return descriptor.index();
87 * Returns the last index in the segment.
89 * @return The last index in the segment.
92 return writer.getLastIndex();
96 * Returns the size of the segment.
98 * @return the size of the segment
102 return (int) channel.size();
103 } catch (IOException e) {
104 throw new StorageException(e);
109 * Returns the segment file.
111 * @return The segment file.
113 JournalSegmentFile file() {
118 * Returns the segment descriptor.
120 * @return The segment descriptor.
122 JournalSegmentDescriptor descriptor() {
127 * Looks up the position of the given index.
129 * @param index the index to lookup
130 * @return the position of the given index or a lesser index, or {@code null}
132 @Nullable Position lookup(long index) {
133 return journalIndex.lookup(index);
137 * Acquires a reference to the log segment.
139 private void acquire() {
140 if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
141 writer = writer.toMapped();
146 * Releases a reference to the log segment.
148 private void release() {
149 if (references.decrementAndGet() == 0) {
150 if (storageLevel == StorageLevel.MAPPED) {
151 writer = writer.toFileChannel();
160 * Acquires a reference to the segment writer.
162 * @return The segment writer.
164 JournalSegmentWriter<E> acquireWriter() {
172 * Releases the reference to the segment writer.
174 void releaseWriter() {
179 * Creates a new segment reader.
181 * @return A new segment reader.
183 JournalSegmentReader<E> createReader() {
187 final var buffer = writer.buffer();
188 final var reader = buffer == null
189 ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace)
190 : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace);
191 reader.setPosition(JournalSegmentDescriptor.BYTES);
197 * Closes a segment reader.
199 * @param reader the closed segment reader
201 void closeReader(JournalSegmentReader<E> reader) {
202 if (readers.remove(reader)) {
208 * Checks whether the segment is open.
210 private void checkOpen() {
212 throw new IllegalStateException("Segment not open");
217 * Returns a boolean indicating whether the segment is open.
219 * @return indicates whether the segment is open
221 public boolean isOpen() {
226 * Closes the segment.
229 public void close() {
235 readers.forEach(JournalSegmentReader::close);
236 if (references.get() == 0) {
241 private void finishClose() {
245 } catch (IOException e) {
246 throw new StorageException(e);
251 * Deletes the segment.
255 Files.deleteIfExists(file.file().toPath());
256 } catch (IOException e) {
257 throw new StorageException(e);
262 public String toString() {
263 return MoreObjects.toStringHelper(this)
264 .add("id", descriptor.id())
265 .add("version", descriptor.version())
266 .add("index", index())