Expand JournalSegmentFile semantics
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 import com.google.common.base.MoreObjects;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.atomix.storage.journal.index.Position;
24 import io.atomix.storage.journal.index.SparseJournalIndex;
25 import java.io.IOException;
26 import java.nio.channels.FileChannel;
27 import java.nio.file.Files;
28 import java.nio.file.StandardOpenOption;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import org.eclipse.jdt.annotation.Nullable;
33
34 /**
35  * Log segment.
36  *
37  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
38  */
39 final class JournalSegment implements AutoCloseable {
40   private final JournalSegmentFile file;
41   private final StorageLevel storageLevel;
42   private final int maxEntrySize;
43   private final JournalIndex journalIndex;
44   private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
45   private final AtomicInteger references = new AtomicInteger();
46   private final FileChannel channel;
47
48   private JournalSegmentWriter writer;
49   private boolean open = true;
50
51   JournalSegment(
52       final JournalSegmentFile file,
53       final StorageLevel storageLevel,
54       final int maxEntrySize,
55       final double indexDensity) {
56     this.file = requireNonNull(file);
57     this.storageLevel = requireNonNull(storageLevel);
58     this.maxEntrySize = maxEntrySize;
59     journalIndex = new SparseJournalIndex(indexDensity);
60     try {
61       channel = FileChannel.open(file.path(),
62         StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
63     } catch (IOException e) {
64       throw new StorageException(e);
65     }
66
67     final var fileWriter = switch (storageLevel) {
68         case DISK -> new DiskFileWriter(file, channel, maxEntrySize);
69         case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize);
70     };
71     writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
72         // relinquish mapped memory
73         .toFileChannel();
74   }
75
76   /**
77    * Returns the segment's starting index.
78    *
79    * @return The segment's starting index.
80    */
81   long firstIndex() {
82     return file.descriptor().index();
83   }
84
85   /**
86    * Returns the last index in the segment.
87    *
88    * @return The last index in the segment.
89    */
90   long lastIndex() {
91     return writer.getLastIndex();
92   }
93
94   /**
95    * Returns the size of the segment.
96    *
97    * @return the size of the segment
98    */
99   int size() {
100     try {
101       return (int) channel.size();
102     } catch (IOException e) {
103       throw new StorageException(e);
104     }
105   }
106
107   /**
108    * Returns the segment file.
109    *
110    * @return The segment file.
111    */
112   JournalSegmentFile file() {
113     return file;
114   }
115
116   /**
117    * Looks up the position of the given index.
118    *
119    * @param index the index to lookup
120    * @return the position of the given index or a lesser index, or {@code null}
121    */
122   @Nullable Position lookup(final long index) {
123     return journalIndex.lookup(index);
124   }
125
126   /**
127    * Acquires a reference to the log segment.
128    */
129   private void acquire() {
130     if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
131       writer = writer.toMapped();
132     }
133   }
134
135   /**
136    * Releases a reference to the log segment.
137    */
138   private void release() {
139     if (references.decrementAndGet() == 0) {
140       if (storageLevel == StorageLevel.MAPPED) {
141         writer = writer.toFileChannel();
142       }
143       if (!open) {
144         finishClose();
145       }
146     }
147   }
148
149   /**
150    * Acquires a reference to the segment writer.
151    *
152    * @return The segment writer.
153    */
154   JournalSegmentWriter acquireWriter() {
155     checkOpen();
156     acquire();
157
158     return writer;
159   }
160
161   /**
162    * Releases the reference to the segment writer.
163    */
164   void releaseWriter() {
165       release();
166   }
167
168   /**
169    * Creates a new segment reader.
170    *
171    * @return A new segment reader.
172    */
173   JournalSegmentReader createReader() {
174     checkOpen();
175     acquire();
176
177     final var buffer = writer.buffer();
178     final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
179         : new DiskFileReader(file, channel, maxEntrySize);
180     final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
181     reader.setPosition(JournalSegmentDescriptor.BYTES);
182     readers.add(reader);
183     return reader;
184   }
185
186   /**
187    * Closes a segment reader.
188    *
189    * @param reader the closed segment reader
190    */
191   void closeReader(JournalSegmentReader reader) {
192     if (readers.remove(reader)) {
193       release();
194     }
195   }
196
197   /**
198    * Checks whether the segment is open.
199    */
200   private void checkOpen() {
201     if (!open) {
202       throw new IllegalStateException("Segment not open");
203     }
204   }
205
206   /**
207    * Returns a boolean indicating whether the segment is open.
208    *
209    * @return indicates whether the segment is open
210    */
211   public boolean isOpen() {
212     return open;
213   }
214
215   /**
216    * Closes the segment.
217    */
218   @Override
219   public void close() {
220     if (!open) {
221       return;
222     }
223
224     open = false;
225     readers.forEach(JournalSegmentReader::close);
226     if (references.get() == 0) {
227       finishClose();
228     }
229   }
230
231   private void finishClose() {
232     writer.close();
233     try {
234       channel.close();
235     } catch (IOException e) {
236       throw new StorageException(e);
237     }
238   }
239
240   /**
241    * Deletes the segment.
242    */
243   void delete() {
244     try {
245       Files.deleteIfExists(file.path());
246     } catch (IOException e) {
247       throw new StorageException(e);
248     }
249   }
250
251   @Override
252   public String toString() {
253     final var descriptor = file.descriptor();
254     return MoreObjects.toStringHelper(this)
255         .add("id", descriptor.id())
256         .add("version", descriptor.version())
257         .add("index", descriptor.index())
258         .toString();
259   }
260 }