Move entry tracking to SegmentedJournalReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import com.google.common.base.MoreObjects;
20 import io.atomix.storage.journal.index.JournalIndex;
21 import io.atomix.storage.journal.index.Position;
22 import io.atomix.storage.journal.index.SparseJournalIndex;
23 import java.io.IOException;
24 import java.nio.channels.FileChannel;
25 import java.nio.file.Files;
26 import java.nio.file.StandardOpenOption;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import org.eclipse.jdt.annotation.Nullable;
31
32 /**
33  * Log segment.
34  *
35  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
36  */
37 final class JournalSegment<E> implements AutoCloseable {
38   private final JournalSegmentFile file;
39   private final JournalSegmentDescriptor descriptor;
40   private final StorageLevel storageLevel;
41   private final int maxEntrySize;
42   private final JournalIndex journalIndex;
43   private final JournalSerdes namespace;
44   private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
45   private final AtomicInteger references = new AtomicInteger();
46   private final FileChannel channel;
47
48   private JournalSegmentWriter<E> writer;
49   private boolean open = true;
50
51   JournalSegment(
52       JournalSegmentFile file,
53       JournalSegmentDescriptor descriptor,
54       StorageLevel storageLevel,
55       int maxEntrySize,
56       double indexDensity,
57       JournalSerdes namespace) {
58     this.file = file;
59     this.descriptor = descriptor;
60     this.storageLevel = storageLevel;
61     this.maxEntrySize = maxEntrySize;
62     this.namespace = namespace;
63     journalIndex = new SparseJournalIndex(indexDensity);
64     try {
65       channel = FileChannel.open(file.file().toPath(),
66         StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
67     } catch (IOException e) {
68       throw new StorageException(e);
69     }
70     writer = switch (storageLevel) {
71         case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace);
72         case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace)
73             .toFileChannel();
74     };
75   }
76
77   /**
78    * Returns the segment's starting index.
79    *
80    * @return The segment's starting index.
81    */
82   long index() {
83     return descriptor.index();
84   }
85
86   /**
87    * Returns the last index in the segment.
88    *
89    * @return The last index in the segment.
90    */
91   long lastIndex() {
92     return writer.getLastIndex();
93   }
94
95   /**
96    * Returns the size of the segment.
97    *
98    * @return the size of the segment
99    */
100   int size() {
101     try {
102       return (int) channel.size();
103     } catch (IOException e) {
104       throw new StorageException(e);
105     }
106   }
107
108   /**
109    * Returns the segment file.
110    *
111    * @return The segment file.
112    */
113   JournalSegmentFile file() {
114     return file;
115   }
116
117   /**
118    * Returns the segment descriptor.
119    *
120    * @return The segment descriptor.
121    */
122   JournalSegmentDescriptor descriptor() {
123     return descriptor;
124   }
125
126   /**
127    * Looks up the position of the given index.
128    *
129    * @param index the index to lookup
130    * @return the position of the given index or a lesser index, or {@code null}
131    */
132   @Nullable Position lookup(long index) {
133     return journalIndex.lookup(index);
134   }
135
136   /**
137    * Acquires a reference to the log segment.
138    */
139   private void acquire() {
140     if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
141       writer = writer.toMapped();
142     }
143   }
144
145   /**
146    * Releases a reference to the log segment.
147    */
148   private void release() {
149     if (references.decrementAndGet() == 0) {
150       if (storageLevel == StorageLevel.MAPPED) {
151         writer = writer.toFileChannel();
152       }
153       if (!open) {
154         finishClose();
155       }
156     }
157   }
158
159   /**
160    * Acquires a reference to the segment writer.
161    *
162    * @return The segment writer.
163    */
164   JournalSegmentWriter<E> acquireWriter() {
165     checkOpen();
166     acquire();
167
168     return writer;
169   }
170
171   /**
172    * Releases the reference to the segment writer.
173    */
174   void releaseWriter() {
175       release();
176   }
177
178   /**
179    * Creates a new segment reader.
180    *
181    * @return A new segment reader.
182    */
183   JournalSegmentReader<E> createReader() {
184     checkOpen();
185     acquire();
186
187     final var buffer = writer.buffer();
188     final var reader = buffer == null
189       ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace)
190         : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace);
191     reader.setPosition(JournalSegmentDescriptor.BYTES);
192     readers.add(reader);
193     return reader;
194   }
195
196   /**
197    * Closes a segment reader.
198    *
199    * @param reader the closed segment reader
200    */
201   void closeReader(JournalSegmentReader<E> reader) {
202     if (readers.remove(reader)) {
203       release();
204     }
205   }
206
207   /**
208    * Checks whether the segment is open.
209    */
210   private void checkOpen() {
211     if (!open) {
212       throw new IllegalStateException("Segment not open");
213     }
214   }
215
216   /**
217    * Returns a boolean indicating whether the segment is open.
218    *
219    * @return indicates whether the segment is open
220    */
221   public boolean isOpen() {
222     return open;
223   }
224
225   /**
226    * Closes the segment.
227    */
228   @Override
229   public void close() {
230     if (!open) {
231       return;
232     }
233
234     open = false;
235     readers.forEach(JournalSegmentReader::close);
236     if (references.get() == 0) {
237       finishClose();
238     }
239   }
240
241   private void finishClose() {
242     writer.close();
243     try {
244       channel.close();
245     } catch (IOException e) {
246       throw new StorageException(e);
247     }
248   }
249
250   /**
251    * Deletes the segment.
252    */
253   void delete() {
254     try {
255       Files.deleteIfExists(file.file().toPath());
256     } catch (IOException e) {
257       throw new StorageException(e);
258     }
259   }
260
261   @Override
262   public String toString() {
263     return MoreObjects.toStringHelper(this)
264         .add("id", descriptor.id())
265         .add("version", descriptor.version())
266         .add("index", index())
267         .toString();
268   }
269 }