Import atomix/{storage,utils}
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import com.google.common.collect.Sets;
19 import io.atomix.storage.StorageException;
20 import io.atomix.storage.StorageLevel;
21 import io.atomix.storage.journal.index.JournalIndex;
22 import io.atomix.storage.journal.index.SparseJournalIndex;
23 import io.atomix.utils.serializer.Namespace;
24
25 import java.io.File;
26 import java.io.IOException;
27 import java.nio.MappedByteBuffer;
28 import java.nio.channels.FileChannel;
29 import java.nio.file.Files;
30 import java.nio.file.StandardOpenOption;
31 import java.util.Set;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import static com.google.common.base.MoreObjects.toStringHelper;
35 import static com.google.common.base.Preconditions.checkState;
36
37 /**
38  * Log segment.
39  *
40  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
41  */
42 public class JournalSegment<E> implements AutoCloseable {
43   private final JournalSegmentFile file;
44   private final JournalSegmentDescriptor descriptor;
45   private final StorageLevel storageLevel;
46   private final int maxEntrySize;
47   private final JournalIndex index;
48   private final Namespace namespace;
49   private final MappableJournalSegmentWriter<E> writer;
50   private final Set<MappableJournalSegmentReader<E>> readers = Sets.newConcurrentHashSet();
51   private final AtomicInteger references = new AtomicInteger();
52   private boolean open = true;
53
54   public JournalSegment(
55       JournalSegmentFile file,
56       JournalSegmentDescriptor descriptor,
57       StorageLevel storageLevel,
58       int maxEntrySize,
59       double indexDensity,
60       Namespace namespace) {
61     this.file = file;
62     this.descriptor = descriptor;
63     this.storageLevel = storageLevel;
64     this.maxEntrySize = maxEntrySize;
65     this.index = new SparseJournalIndex(indexDensity);
66     this.namespace = namespace;
67     this.writer = new MappableJournalSegmentWriter<>(openChannel(file.file()), this, maxEntrySize, index, namespace);
68   }
69
70   private FileChannel openChannel(File file) {
71     try {
72       return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
73     } catch (IOException e) {
74       throw new StorageException(e);
75     }
76   }
77
78   /**
79    * Returns the segment ID.
80    *
81    * @return The segment ID.
82    */
83   public long id() {
84     return descriptor.id();
85   }
86
87   /**
88    * Returns the segment version.
89    *
90    * @return The segment version.
91    */
92   public long version() {
93     return descriptor.version();
94   }
95
96   /**
97    * Returns the segment's starting index.
98    *
99    * @return The segment's starting index.
100    */
101   public long index() {
102     return descriptor.index();
103   }
104
105   /**
106    * Returns the last index in the segment.
107    *
108    * @return The last index in the segment.
109    */
110   public long lastIndex() {
111     return writer.getLastIndex();
112   }
113
114   /**
115    * Returns the size of the segment.
116    *
117    * @return the size of the segment
118    */
119   public int size() {
120     return writer.size();
121   }
122
123   /**
124    * Returns the segment file.
125    *
126    * @return The segment file.
127    */
128   public JournalSegmentFile file() {
129     return file;
130   }
131
132   /**
133    * Returns the segment descriptor.
134    *
135    * @return The segment descriptor.
136    */
137   public JournalSegmentDescriptor descriptor() {
138     return descriptor;
139   }
140
141   /**
142    * Returns a boolean value indicating whether the segment is empty.
143    *
144    * @return Indicates whether the segment is empty.
145    */
146   public boolean isEmpty() {
147     return length() == 0;
148   }
149
150   /**
151    * Returns the segment length.
152    *
153    * @return The segment length.
154    */
155   public long length() {
156     return writer.getNextIndex() - index();
157   }
158
159   /**
160    * Acquires a reference to the log segment.
161    */
162   void acquire() {
163     if (references.getAndIncrement() == 0 && open) {
164       map();
165     }
166   }
167
168   /**
169    * Releases a reference to the log segment.
170    */
171   void release() {
172     if (references.decrementAndGet() == 0 && open) {
173       unmap();
174     }
175   }
176
177   /**
178    * Maps the log segment into memory.
179    */
180   private void map() {
181     if (storageLevel == StorageLevel.MAPPED) {
182       MappedByteBuffer buffer = writer.map();
183       readers.forEach(reader -> reader.map(buffer));
184     }
185   }
186
187   /**
188    * Unmaps the log segment from memory.
189    */
190   private void unmap() {
191     if (storageLevel == StorageLevel.MAPPED) {
192       writer.unmap();
193       readers.forEach(reader -> reader.unmap());
194     }
195   }
196
197   /**
198    * Returns the segment writer.
199    *
200    * @return The segment writer.
201    */
202   public MappableJournalSegmentWriter<E> writer() {
203     checkOpen();
204     return writer;
205   }
206
207   /**
208    * Creates a new segment reader.
209    *
210    * @return A new segment reader.
211    */
212   MappableJournalSegmentReader<E> createReader() {
213     checkOpen();
214     MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(
215         openChannel(file.file()), this, maxEntrySize, index, namespace);
216     MappedByteBuffer buffer = writer.buffer();
217     if (buffer != null) {
218       reader.map(buffer);
219     }
220     readers.add(reader);
221     return reader;
222   }
223
224   /**
225    * Closes a segment reader.
226    *
227    * @param reader the closed segment reader
228    */
229   void closeReader(MappableJournalSegmentReader<E> reader) {
230     readers.remove(reader);
231   }
232
233   /**
234    * Checks whether the segment is open.
235    */
236   private void checkOpen() {
237     checkState(open, "Segment not open");
238   }
239
240   /**
241    * Returns a boolean indicating whether the segment is open.
242    *
243    * @return indicates whether the segment is open
244    */
245   public boolean isOpen() {
246     return open;
247   }
248
249   /**
250    * Closes the segment.
251    */
252   @Override
253   public void close() {
254     unmap();
255     writer.close();
256     readers.forEach(reader -> reader.close());
257     open = false;
258   }
259
260   /**
261    * Deletes the segment.
262    */
263   public void delete() {
264     try {
265       Files.deleteIfExists(file.file().toPath());
266     } catch (IOException e) {
267       throw new StorageException(e);
268     }
269   }
270
271   @Override
272   public String toString() {
273     return toStringHelper(this)
274         .add("id", id())
275         .add("version", version())
276         .add("index", index())
277         .toString();
278   }
279 }