Do not expose descriptor from JournalSegmentFile
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 import com.google.common.base.MoreObjects;
22 import io.atomix.storage.journal.index.JournalIndex;
23 import io.atomix.storage.journal.index.Position;
24 import io.atomix.storage.journal.index.SparseJournalIndex;
25 import java.io.IOException;
26 import java.nio.file.Files;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * Log segment.
36  *
37  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
38  */
39 final class JournalSegment {
40   private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
41
42   private final JournalSegmentFile file;
43   private final StorageLevel storageLevel;
44   private final int maxEntrySize;
45   private final JournalIndex journalIndex;
46   private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
47   private final AtomicInteger references = new AtomicInteger();
48
49   private JournalSegmentWriter writer;
50   private boolean open = true;
51
52   JournalSegment(
53       final JournalSegmentFile file,
54       final StorageLevel storageLevel,
55       final int maxEntrySize,
56       final double indexDensity) {
57     this.file = requireNonNull(file);
58     this.storageLevel = requireNonNull(storageLevel);
59     this.maxEntrySize = maxEntrySize;
60     journalIndex = new SparseJournalIndex(indexDensity);
61
62     final var fileWriter = switch (storageLevel) {
63         case DISK -> new DiskFileWriter(file, maxEntrySize);
64         case MAPPED -> new MappedFileWriter(file, maxEntrySize);
65     };
66     writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
67         // relinquish mapped memory
68         .toFileChannel();
69   }
70
71   /**
72    * Returns the segment's starting index.
73    *
74    * @return The segment's starting index.
75    */
76   long firstIndex() {
77     return file.firstIndex();
78   }
79
80   /**
81    * Returns the last index in the segment.
82    *
83    * @return The last index in the segment.
84    */
85   long lastIndex() {
86     return writer.getLastIndex();
87   }
88
89   /**
90    * Returns the segment file.
91    *
92    * @return The segment file.
93    */
94   JournalSegmentFile file() {
95     return file;
96   }
97
98   /**
99    * Looks up the position of the given index.
100    *
101    * @param index the index to lookup
102    * @return the position of the given index or a lesser index, or {@code null}
103    */
104   @Nullable Position lookup(final long index) {
105     return journalIndex.lookup(index);
106   }
107
108   /**
109    * Acquires a reference to the log segment.
110    */
111   private void acquire() {
112     if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
113       writer = writer.toMapped();
114     }
115   }
116
117   /**
118    * Releases a reference to the log segment.
119    */
120   private void release() {
121     if (references.decrementAndGet() == 0) {
122       if (storageLevel == StorageLevel.MAPPED) {
123         writer = writer.toFileChannel();
124       }
125       if (!open) {
126         finishClose();
127       }
128     }
129   }
130
131   /**
132    * Acquires a reference to the segment writer.
133    *
134    * @return The segment writer.
135    */
136   JournalSegmentWriter acquireWriter() {
137     checkOpen();
138     acquire();
139
140     return writer;
141   }
142
143   /**
144    * Releases the reference to the segment writer.
145    */
146   void releaseWriter() {
147       release();
148   }
149
150   /**
151    * Creates a new segment reader.
152    *
153    * @return A new segment reader.
154    */
155   JournalSegmentReader createReader() {
156     checkOpen();
157     acquire();
158
159     final var buffer = writer.buffer();
160     final var fileReader = buffer != null ? new MappedFileReader(file, buffer) : new DiskFileReader(file, maxEntrySize);
161     final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
162     reader.setPosition(JournalSegmentDescriptor.BYTES);
163     readers.add(reader);
164     return reader;
165   }
166
167   /**
168    * Closes a segment reader.
169    *
170    * @param reader the closed segment reader
171    */
172   void closeReader(final JournalSegmentReader reader) {
173     if (readers.remove(reader)) {
174       release();
175     }
176   }
177
178   /**
179    * Checks whether the segment is open.
180    */
181   private void checkOpen() {
182     if (!open) {
183       throw new IllegalStateException("Segment not open");
184     }
185   }
186
187   /**
188    * Returns a boolean indicating whether the segment is open.
189    *
190    * @return indicates whether the segment is open
191    */
192   boolean isOpen() {
193     return open;
194   }
195
196   /**
197    * Closes the segment.
198    */
199   void close() {
200     if (!open) {
201       return;
202     }
203
204     LOG.debug("Closing segment: {}", this);
205     open = false;
206     readers.forEach(JournalSegmentReader::close);
207     if (references.get() == 0) {
208       finishClose();
209     }
210   }
211
212   private void finishClose() {
213     writer.close();
214     try {
215       file.close();
216     } catch (IOException e) {
217       throw new StorageException(e);
218     }
219   }
220
221   /**
222    * Deletes the segment.
223    */
224   void delete() {
225     close();
226     LOG.debug("Deleting segment: {}", this);
227     try {
228       Files.deleteIfExists(file.path());
229     } catch (IOException e) {
230       throw new StorageException(e);
231     }
232   }
233
234   @Override
235   public String toString() {
236     return MoreObjects.toStringHelper(this)
237       .add("id", file.segmentId())
238       .add("version", file.version())
239       .add("index", file.firstIndex())
240       .toString();
241   }
242 }