Lower reader buffering
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import com.google.common.base.MoreObjects;
20 import io.atomix.storage.journal.index.JournalIndex;
21 import io.atomix.storage.journal.index.Position;
22 import io.atomix.storage.journal.index.SparseJournalIndex;
23 import java.io.IOException;
24 import java.nio.channels.FileChannel;
25 import java.nio.file.Files;
26 import java.nio.file.StandardOpenOption;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import org.eclipse.jdt.annotation.Nullable;
31
32 /**
33  * Log segment.
34  *
35  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
36  */
37 final class JournalSegment<E> implements AutoCloseable {
38   private final JournalSegmentFile file;
39   private final JournalSegmentDescriptor descriptor;
40   private final StorageLevel storageLevel;
41   private final int maxEntrySize;
42   private final JournalIndex journalIndex;
43   private final JournalSerdes namespace;
44   private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
45   private final AtomicInteger references = new AtomicInteger();
46   private final FileChannel channel;
47
48   private JournalSegmentWriter<E> writer;
49   private boolean open = true;
50
51   JournalSegment(
52       JournalSegmentFile file,
53       JournalSegmentDescriptor descriptor,
54       StorageLevel storageLevel,
55       int maxEntrySize,
56       double indexDensity,
57       JournalSerdes namespace) {
58     this.file = file;
59     this.descriptor = descriptor;
60     this.storageLevel = storageLevel;
61     this.maxEntrySize = maxEntrySize;
62     this.namespace = namespace;
63     journalIndex = new SparseJournalIndex(indexDensity);
64     try {
65       channel = FileChannel.open(file.file().toPath(),
66         StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
67     } catch (IOException e) {
68       throw new StorageException(e);
69     }
70     writer = switch (storageLevel) {
71         case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace);
72         case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace)
73             .toFileChannel();
74     };
75   }
76
77   /**
78    * Returns the segment's starting index.
79    *
80    * @return The segment's starting index.
81    */
82   long index() {
83     return descriptor.index();
84   }
85
86   /**
87    * Returns the last index in the segment.
88    *
89    * @return The last index in the segment.
90    */
91   long lastIndex() {
92     return writer.getLastIndex();
93   }
94
95   /**
96    * Returns the size of the segment.
97    *
98    * @return the size of the segment
99    */
100   int size() {
101     try {
102       return (int) channel.size();
103     } catch (IOException e) {
104       throw new StorageException(e);
105     }
106   }
107
108   /**
109    * Returns the segment file.
110    *
111    * @return The segment file.
112    */
113   JournalSegmentFile file() {
114     return file;
115   }
116
117   /**
118    * Returns the segment descriptor.
119    *
120    * @return The segment descriptor.
121    */
122   JournalSegmentDescriptor descriptor() {
123     return descriptor;
124   }
125
126   /**
127    * Looks up the position of the given index.
128    *
129    * @param index the index to lookup
130    * @return the position of the given index or a lesser index, or {@code null}
131    */
132   @Nullable Position lookup(long index) {
133     return journalIndex.lookup(index);
134   }
135
136   /**
137    * Acquires a reference to the log segment.
138    */
139   private void acquire() {
140     if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
141       writer = writer.toMapped();
142     }
143   }
144
145   /**
146    * Releases a reference to the log segment.
147    */
148   private void release() {
149     if (references.decrementAndGet() == 0) {
150       if (storageLevel == StorageLevel.MAPPED) {
151         writer = writer.toFileChannel();
152       }
153       if (!open) {
154         finishClose();
155       }
156     }
157   }
158
159   /**
160    * Acquires a reference to the segment writer.
161    *
162    * @return The segment writer.
163    */
164   JournalSegmentWriter<E> acquireWriter() {
165     checkOpen();
166     acquire();
167
168     return writer;
169   }
170
171   /**
172    * Releases the reference to the segment writer.
173    */
174   void releaseWriter() {
175       release();
176   }
177
178   /**
179    * Creates a new segment reader.
180    *
181    * @return A new segment reader.
182    */
183   JournalSegmentReader<E> createReader() {
184     checkOpen();
185     acquire();
186
187     final var buffer = writer.buffer();
188     final var path = file.file().toPath();
189     final var fileReader = buffer != null ? new MappedFileReader(path, buffer)
190         : new DiskFileReader(path, channel, descriptor.maxSegmentSize(), maxEntrySize);
191     final var reader = new JournalSegmentReader<>(this, fileReader, maxEntrySize, namespace);
192     reader.setPosition(JournalSegmentDescriptor.BYTES);
193     readers.add(reader);
194     return reader;
195   }
196
197   /**
198    * Closes a segment reader.
199    *
200    * @param reader the closed segment reader
201    */
202   void closeReader(JournalSegmentReader<E> reader) {
203     if (readers.remove(reader)) {
204       release();
205     }
206   }
207
208   /**
209    * Checks whether the segment is open.
210    */
211   private void checkOpen() {
212     if (!open) {
213       throw new IllegalStateException("Segment not open");
214     }
215   }
216
217   /**
218    * Returns a boolean indicating whether the segment is open.
219    *
220    * @return indicates whether the segment is open
221    */
222   public boolean isOpen() {
223     return open;
224   }
225
226   /**
227    * Closes the segment.
228    */
229   @Override
230   public void close() {
231     if (!open) {
232       return;
233     }
234
235     open = false;
236     readers.forEach(JournalSegmentReader::close);
237     if (references.get() == 0) {
238       finishClose();
239     }
240   }
241
242   private void finishClose() {
243     writer.close();
244     try {
245       channel.close();
246     } catch (IOException e) {
247       throw new StorageException(e);
248     }
249   }
250
251   /**
252    * Deletes the segment.
253    */
254   void delete() {
255     try {
256       Files.deleteIfExists(file.file().toPath());
257     } catch (IOException e) {
258       throw new StorageException(e);
259     }
260   }
261
262   @Override
263   public String toString() {
264     return MoreObjects.toStringHelper(this)
265         .add("id", descriptor.id())
266         .add("version", descriptor.version())
267         .add("index", index())
268         .toString();
269   }
270 }