Clean up initial FileChannel writer reset's read
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import io.atomix.storage.journal.index.SparseJournalIndex;
20 import java.io.File;
21 import java.io.IOException;
22 import java.nio.MappedByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.nio.file.Files;
25 import java.nio.file.StandardOpenOption;
26 import java.util.Set;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import static com.google.common.base.MoreObjects.toStringHelper;
31 import static com.google.common.base.Preconditions.checkState;
32
33 /**
34  * Log segment.
35  *
36  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
37  */
38 public class JournalSegment<E> implements AutoCloseable {
39   private final JournalSegmentFile file;
40   private final JournalSegmentDescriptor descriptor;
41   private final StorageLevel storageLevel;
42   private final int maxEntrySize;
43   private final JournalIndex index;
44   private final JournalSerdes namespace;
45   private final MappableJournalSegmentWriter<E> writer;
46   private final Set<MappableJournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
47   private final AtomicInteger references = new AtomicInteger();
48   private boolean open = true;
49
50   public JournalSegment(
51       JournalSegmentFile file,
52       JournalSegmentDescriptor descriptor,
53       StorageLevel storageLevel,
54       int maxEntrySize,
55       double indexDensity,
56       JournalSerdes namespace) {
57     this.file = file;
58     this.descriptor = descriptor;
59     this.storageLevel = storageLevel;
60     this.maxEntrySize = maxEntrySize;
61     this.index = new SparseJournalIndex(indexDensity);
62     this.namespace = namespace;
63     this.writer = new MappableJournalSegmentWriter<>(openChannel(file.file()), this, maxEntrySize, index, namespace);
64   }
65
66   private FileChannel openChannel(File file) {
67     try {
68       return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
69     } catch (IOException e) {
70       throw new StorageException(e);
71     }
72   }
73
74   /**
75    * Returns the segment ID.
76    *
77    * @return The segment ID.
78    */
79   public long id() {
80     return descriptor.id();
81   }
82
83   /**
84    * Returns the segment version.
85    *
86    * @return The segment version.
87    */
88   public long version() {
89     return descriptor.version();
90   }
91
92   /**
93    * Returns the segment's starting index.
94    *
95    * @return The segment's starting index.
96    */
97   public long index() {
98     return descriptor.index();
99   }
100
101   /**
102    * Returns the last index in the segment.
103    *
104    * @return The last index in the segment.
105    */
106   public long lastIndex() {
107     return writer.getLastIndex();
108   }
109
110   /**
111    * Returns the size of the segment.
112    *
113    * @return the size of the segment
114    */
115   public int size() {
116     return writer.size();
117   }
118
119   /**
120    * Returns the segment file.
121    *
122    * @return The segment file.
123    */
124   public JournalSegmentFile file() {
125     return file;
126   }
127
128   /**
129    * Returns the segment descriptor.
130    *
131    * @return The segment descriptor.
132    */
133   public JournalSegmentDescriptor descriptor() {
134     return descriptor;
135   }
136
137   /**
138    * Returns a boolean value indicating whether the segment is empty.
139    *
140    * @return Indicates whether the segment is empty.
141    */
142   public boolean isEmpty() {
143     return length() == 0;
144   }
145
146   /**
147    * Returns the segment length.
148    *
149    * @return The segment length.
150    */
151   public long length() {
152     return writer.getNextIndex() - index();
153   }
154
155   /**
156    * Acquires a reference to the log segment.
157    */
158   void acquire() {
159     if (references.getAndIncrement() == 0 && open) {
160       map();
161     }
162   }
163
164   /**
165    * Releases a reference to the log segment.
166    */
167   void release() {
168     if (references.decrementAndGet() == 0 && open) {
169       unmap();
170     }
171   }
172
173   /**
174    * Maps the log segment into memory.
175    */
176   private void map() {
177     if (storageLevel == StorageLevel.MAPPED) {
178       MappedByteBuffer buffer = writer.map();
179       readers.forEach(reader -> reader.map(buffer));
180     }
181   }
182
183   /**
184    * Unmaps the log segment from memory.
185    */
186   private void unmap() {
187     if (storageLevel == StorageLevel.MAPPED) {
188       writer.unmap();
189       readers.forEach(reader -> reader.unmap());
190     }
191   }
192
193   /**
194    * Returns the segment writer.
195    *
196    * @return The segment writer.
197    */
198   public MappableJournalSegmentWriter<E> writer() {
199     checkOpen();
200     return writer;
201   }
202
203   /**
204    * Creates a new segment reader.
205    *
206    * @return A new segment reader.
207    */
208   MappableJournalSegmentReader<E> createReader() {
209     checkOpen();
210     MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(
211         openChannel(file.file()), this, maxEntrySize, index, namespace);
212     MappedByteBuffer buffer = writer.buffer();
213     if (buffer != null) {
214       reader.map(buffer);
215     }
216     readers.add(reader);
217     return reader;
218   }
219
220   /**
221    * Closes a segment reader.
222    *
223    * @param reader the closed segment reader
224    */
225   void closeReader(MappableJournalSegmentReader<E> reader) {
226     readers.remove(reader);
227   }
228
229   /**
230    * Checks whether the segment is open.
231    */
232   private void checkOpen() {
233     checkState(open, "Segment not open");
234   }
235
236   /**
237    * Returns a boolean indicating whether the segment is open.
238    *
239    * @return indicates whether the segment is open
240    */
241   public boolean isOpen() {
242     return open;
243   }
244
245   /**
246    * Closes the segment.
247    */
248   @Override
249   public void close() {
250     unmap();
251     writer.close();
252     readers.forEach(reader -> reader.close());
253     open = false;
254   }
255
256   /**
257    * Deletes the segment.
258    */
259   public void delete() {
260     try {
261       Files.deleteIfExists(file.file().toPath());
262     } catch (IOException e) {
263       throw new StorageException(e);
264     }
265   }
266
267   @Override
268   public String toString() {
269     return toStringHelper(this)
270         .add("id", id())
271         .add("version", version())
272         .add("index", index())
273         .toString();
274   }
275 }