2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import io.atomix.storage.journal.index.JournalIndex;
19 import io.atomix.storage.journal.index.SparseJournalIndex;
21 import java.io.IOException;
22 import java.nio.MappedByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.nio.file.Files;
25 import java.nio.file.StandardOpenOption;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.atomic.AtomicInteger;
30 import static com.google.common.base.MoreObjects.toStringHelper;
31 import static com.google.common.base.Preconditions.checkState;
36 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
38 public class JournalSegment<E> implements AutoCloseable {
39 private final JournalSegmentFile file;
40 private final JournalSegmentDescriptor descriptor;
41 private final StorageLevel storageLevel;
42 private final int maxEntrySize;
43 private final JournalIndex index;
44 private final JournalSerdes namespace;
45 private final MappableJournalSegmentWriter<E> writer;
46 private final Set<MappableJournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
47 private final AtomicInteger references = new AtomicInteger();
48 private boolean open = true;
50 public JournalSegment(
51 JournalSegmentFile file,
52 JournalSegmentDescriptor descriptor,
53 StorageLevel storageLevel,
56 JournalSerdes namespace) {
58 this.descriptor = descriptor;
59 this.storageLevel = storageLevel;
60 this.maxEntrySize = maxEntrySize;
61 this.index = new SparseJournalIndex(indexDensity);
62 this.namespace = namespace;
63 this.writer = new MappableJournalSegmentWriter<>(openChannel(file.file()), this, maxEntrySize, index, namespace);
66 private FileChannel openChannel(File file) {
68 return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
69 } catch (IOException e) {
70 throw new StorageException(e);
75 * Returns the segment ID.
77 * @return The segment ID.
80 return descriptor.id();
84 * Returns the segment version.
86 * @return The segment version.
88 public long version() {
89 return descriptor.version();
93 * Returns the segment's starting index.
95 * @return The segment's starting index.
98 return descriptor.index();
102 * Returns the last index in the segment.
104 * @return The last index in the segment.
106 public long lastIndex() {
107 return writer.getLastIndex();
111 * Returns the size of the segment.
113 * @return the size of the segment
116 return writer.size();
120 * Returns the segment file.
122 * @return The segment file.
124 public JournalSegmentFile file() {
129 * Returns the segment descriptor.
131 * @return The segment descriptor.
133 public JournalSegmentDescriptor descriptor() {
138 * Returns a boolean value indicating whether the segment is empty.
140 * @return Indicates whether the segment is empty.
142 public boolean isEmpty() {
143 return length() == 0;
147 * Returns the segment length.
149 * @return The segment length.
151 public long length() {
152 return writer.getNextIndex() - index();
156 * Acquires a reference to the log segment.
159 if (references.getAndIncrement() == 0 && open) {
165 * Releases a reference to the log segment.
168 if (references.decrementAndGet() == 0 && open) {
174 * Maps the log segment into memory.
177 if (storageLevel == StorageLevel.MAPPED) {
178 MappedByteBuffer buffer = writer.map();
179 readers.forEach(reader -> reader.map(buffer));
184 * Unmaps the log segment from memory.
186 private void unmap() {
187 if (storageLevel == StorageLevel.MAPPED) {
189 readers.forEach(reader -> reader.unmap());
194 * Returns the segment writer.
196 * @return The segment writer.
198 public MappableJournalSegmentWriter<E> writer() {
204 * Creates a new segment reader.
206 * @return A new segment reader.
208 MappableJournalSegmentReader<E> createReader() {
210 MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(
211 openChannel(file.file()), this, maxEntrySize, index, namespace);
212 MappedByteBuffer buffer = writer.buffer();
213 if (buffer != null) {
221 * Closes a segment reader.
223 * @param reader the closed segment reader
225 void closeReader(MappableJournalSegmentReader<E> reader) {
226 readers.remove(reader);
230 * Checks whether the segment is open.
232 private void checkOpen() {
233 checkState(open, "Segment not open");
237 * Returns a boolean indicating whether the segment is open.
239 * @return indicates whether the segment is open
241 public boolean isOpen() {
246 * Closes the segment.
249 public void close() {
252 readers.forEach(reader -> reader.close());
257 * Deletes the segment.
259 public void delete() {
261 Files.deleteIfExists(file.file().toPath());
262 } catch (IOException e) {
263 throw new StorageException(e);
268 public String toString() {
269 return toStringHelper(this)
271 .add("version", version())
272 .add("index", index())