Make JournalSegmentWriter.getNextIndex() final
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import com.google.common.base.MoreObjects;
19 import io.atomix.storage.journal.index.JournalIndex;
20 import io.atomix.storage.journal.index.SparseJournalIndex;
21 import java.io.IOException;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.Files;
24 import java.nio.file.StandardOpenOption;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 /**
30  * Log segment.
31  *
32  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
33  */
34 final class JournalSegment<E> implements AutoCloseable {
35   private final JournalSegmentFile file;
36   private final JournalSegmentDescriptor descriptor;
37   private final StorageLevel storageLevel;
38   private final int maxEntrySize;
39   private final JournalIndex index;
40   private final JournalSerdes namespace;
41   private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
42   private final AtomicInteger references = new AtomicInteger();
43   private final FileChannel channel;
44
45   private JournalSegmentWriter<E> writer;
46   private boolean open = true;
47
48   JournalSegment(
49       JournalSegmentFile file,
50       JournalSegmentDescriptor descriptor,
51       StorageLevel storageLevel,
52       int maxEntrySize,
53       double indexDensity,
54       JournalSerdes namespace) {
55     this.file = file;
56     this.descriptor = descriptor;
57     this.storageLevel = storageLevel;
58     this.maxEntrySize = maxEntrySize;
59     this.namespace = namespace;
60     index = new SparseJournalIndex(indexDensity);
61     try {
62       channel = FileChannel.open(file.file().toPath(),
63         StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
64     } catch (IOException e) {
65       throw new StorageException(e);
66     }
67     writer = switch (storageLevel) {
68         case DISK -> new FileChannelJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace);
69         case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace).toFileChannel();
70     };
71   }
72
73   /**
74    * Returns the segment's starting index.
75    *
76    * @return The segment's starting index.
77    */
78   long index() {
79     return descriptor.index();
80   }
81
82   /**
83    * Returns the last index in the segment.
84    *
85    * @return The last index in the segment.
86    */
87   long lastIndex() {
88     return writer.getLastIndex();
89   }
90
91   /**
92    * Returns the size of the segment.
93    *
94    * @return the size of the segment
95    */
96   int size() {
97     try {
98       return (int) channel.size();
99     } catch (IOException e) {
100       throw new StorageException(e);
101     }
102   }
103
104   /**
105    * Returns the segment file.
106    *
107    * @return The segment file.
108    */
109   JournalSegmentFile file() {
110     return file;
111   }
112
113   /**
114    * Returns the segment descriptor.
115    *
116    * @return The segment descriptor.
117    */
118   JournalSegmentDescriptor descriptor() {
119     return descriptor;
120   }
121
122   /**
123    * Acquires a reference to the log segment.
124    */
125   private void acquire() {
126     if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
127       writer = writer.toMapped();
128     }
129   }
130
131   /**
132    * Releases a reference to the log segment.
133    */
134   private void release() {
135     if (references.decrementAndGet() == 0) {
136       if (storageLevel == StorageLevel.MAPPED) {
137         writer = writer.toFileChannel();
138       }
139       if (!open) {
140         finishClose();
141       }
142     }
143   }
144
145   /**
146    * Acquires a reference to the segment writer.
147    *
148    * @return The segment writer.
149    */
150   JournalSegmentWriter<E> acquireWriter() {
151     checkOpen();
152     acquire();
153
154     return writer;
155   }
156
157   /**
158    * Releases the reference to the segment writer.
159    */
160   void releaseWriter() {
161       release();
162   }
163
164   /**
165    * Creates a new segment reader.
166    *
167    * @return A new segment reader.
168    */
169   JournalSegmentReader<E> createReader() {
170     checkOpen();
171     acquire();
172
173     final var buffer = writer.buffer();
174     final var reader = buffer == null
175       ? new FileChannelJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace)
176         : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace);
177     readers.add(reader);
178     return reader;
179   }
180
181   /**
182    * Closes a segment reader.
183    *
184    * @param reader the closed segment reader
185    */
186   void closeReader(JournalSegmentReader<E> reader) {
187     if (readers.remove(reader)) {
188       release();
189     }
190   }
191
192   /**
193    * Checks whether the segment is open.
194    */
195   private void checkOpen() {
196     if (!open) {
197       throw new IllegalStateException("Segment not open");
198     }
199   }
200
201   /**
202    * Returns a boolean indicating whether the segment is open.
203    *
204    * @return indicates whether the segment is open
205    */
206   public boolean isOpen() {
207     return open;
208   }
209
210   /**
211    * Closes the segment.
212    */
213   @Override
214   public void close() {
215     if (!open) {
216       return;
217     }
218
219     open = false;
220     readers.forEach(JournalSegmentReader::close);
221     if (references.get() == 0) {
222       finishClose();
223     }
224   }
225
226   private void finishClose() {
227     writer.close();
228     try {
229       channel.close();
230     } catch (IOException e) {
231       throw new StorageException(e);
232     }
233   }
234
235   /**
236    * Deletes the segment.
237    */
238   void delete() {
239     try {
240       Files.deleteIfExists(file.file().toPath());
241     } catch (IOException e) {
242       throw new StorageException(e);
243     }
244   }
245
246   @Override
247   public String toString() {
248     return MoreObjects.toStringHelper(this)
249         .add("id", descriptor.id())
250         .add("version", descriptor.version())
251         .add("index", index())
252         .toString();
253   }
254 }