a7f4c5aadc5c2845723ec362f60412b47b373c83
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 import com.google.common.base.MoreObjects;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.atomix.storage.journal.index.Position;
24 import io.atomix.storage.journal.index.SparseJournalIndex;
25 import java.io.IOException;
26 import java.nio.channels.FileChannel;
27 import java.nio.file.Files;
28 import java.nio.file.StandardOpenOption;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import org.eclipse.jdt.annotation.Nullable;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Log segment.
38  *
39  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
40  */
41 final class JournalSegment {
42   private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
43
44   private final JournalSegmentFile file;
45   private final StorageLevel storageLevel;
46   private final int maxEntrySize;
47   private final JournalIndex journalIndex;
48   private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
49   private final AtomicInteger references = new AtomicInteger();
50   private final FileChannel channel;
51
52   private JournalSegmentWriter writer;
53   private boolean open = true;
54
55   JournalSegment(
56       final JournalSegmentFile file,
57       final StorageLevel storageLevel,
58       final int maxEntrySize,
59       final double indexDensity) {
60     this.file = requireNonNull(file);
61     this.storageLevel = requireNonNull(storageLevel);
62     this.maxEntrySize = maxEntrySize;
63     journalIndex = new SparseJournalIndex(indexDensity);
64     try {
65       channel = FileChannel.open(file.path(),
66         StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
67     } catch (IOException e) {
68       throw new StorageException(e);
69     }
70
71     final var fileWriter = switch (storageLevel) {
72         case DISK -> new DiskFileWriter(file, channel, maxEntrySize);
73         case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize);
74     };
75     writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
76         // relinquish mapped memory
77         .toFileChannel();
78   }
79
80   /**
81    * Returns the segment's starting index.
82    *
83    * @return The segment's starting index.
84    */
85   long firstIndex() {
86     return file.descriptor().index();
87   }
88
89   /**
90    * Returns the last index in the segment.
91    *
92    * @return The last index in the segment.
93    */
94   long lastIndex() {
95     return writer.getLastIndex();
96   }
97
98   /**
99    * Returns the size of the segment.
100    *
101    * @return the size of the segment
102    */
103   int size() {
104     try {
105       return (int) channel.size();
106     } catch (IOException e) {
107       throw new StorageException(e);
108     }
109   }
110
111   /**
112    * Returns the segment file.
113    *
114    * @return The segment file.
115    */
116   JournalSegmentFile file() {
117     return file;
118   }
119
120   /**
121    * Looks up the position of the given index.
122    *
123    * @param index the index to lookup
124    * @return the position of the given index or a lesser index, or {@code null}
125    */
126   @Nullable Position lookup(final long index) {
127     return journalIndex.lookup(index);
128   }
129
130   /**
131    * Acquires a reference to the log segment.
132    */
133   private void acquire() {
134     if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
135       writer = writer.toMapped();
136     }
137   }
138
139   /**
140    * Releases a reference to the log segment.
141    */
142   private void release() {
143     if (references.decrementAndGet() == 0) {
144       if (storageLevel == StorageLevel.MAPPED) {
145         writer = writer.toFileChannel();
146       }
147       if (!open) {
148         finishClose();
149       }
150     }
151   }
152
153   /**
154    * Acquires a reference to the segment writer.
155    *
156    * @return The segment writer.
157    */
158   JournalSegmentWriter acquireWriter() {
159     checkOpen();
160     acquire();
161
162     return writer;
163   }
164
165   /**
166    * Releases the reference to the segment writer.
167    */
168   void releaseWriter() {
169       release();
170   }
171
172   /**
173    * Creates a new segment reader.
174    *
175    * @return A new segment reader.
176    */
177   JournalSegmentReader createReader() {
178     checkOpen();
179     acquire();
180
181     final var buffer = writer.buffer();
182     final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
183         : new DiskFileReader(file, channel, maxEntrySize);
184     final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
185     reader.setPosition(JournalSegmentDescriptor.BYTES);
186     readers.add(reader);
187     return reader;
188   }
189
190   /**
191    * Closes a segment reader.
192    *
193    * @param reader the closed segment reader
194    */
195   void closeReader(final JournalSegmentReader reader) {
196     if (readers.remove(reader)) {
197       release();
198     }
199   }
200
201   /**
202    * Checks whether the segment is open.
203    */
204   private void checkOpen() {
205     if (!open) {
206       throw new IllegalStateException("Segment not open");
207     }
208   }
209
210   /**
211    * Returns a boolean indicating whether the segment is open.
212    *
213    * @return indicates whether the segment is open
214    */
215   boolean isOpen() {
216     return open;
217   }
218
219   /**
220    * Closes the segment.
221    */
222   void close() {
223     if (!open) {
224       return;
225     }
226
227     LOG.debug("Closing segment: {}", this);
228     open = false;
229     readers.forEach(JournalSegmentReader::close);
230     if (references.get() == 0) {
231       finishClose();
232     }
233   }
234
235   private void finishClose() {
236     writer.close();
237     try {
238       channel.close();
239     } catch (IOException e) {
240       throw new StorageException(e);
241     }
242   }
243
244   /**
245    * Deletes the segment.
246    */
247   void delete() {
248     close();
249     LOG.debug("Deleting segment: {}", this);
250     try {
251       Files.deleteIfExists(file.path());
252     } catch (IOException e) {
253       throw new StorageException(e);
254     }
255   }
256
257   @Override
258   public String toString() {
259     final var descriptor = file.descriptor();
260     return MoreObjects.toStringHelper(this)
261         .add("id", descriptor.id())
262         .add("version", descriptor.version())
263         .add("index", descriptor.index())
264         .toString();
265   }
266 }