import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class JournalSegment {
+ /**
+ * Encapsulation of a {@link JournalSegment}'s state;
+ */
+ sealed interface State {
+ // Marker interface
+ }
+
+ /**
+ * Journal segment is active, i.e. there is a associated with it.
+ */
+ @NonNullByDefault
+ record Active(FileAccess access, JournalSegmentWriter writer) implements State {
+ Active {
+ requireNonNull(access);
+ requireNonNull(writer);
+ }
+
+ Inactive deactivate() {
+ final var inactive = new Inactive(writer.currentPosition());
+ access.close();
+ return inactive;
+ }
+ }
+
+ /**
+ * Journal segment is inactive, i.e. there is no writer associated with it.
+ */
+ @NonNullByDefault
+ record Inactive(int position) implements State {
+ Active activate(final JournalSegment segment) throws IOException {
+ final var access = segment.file.newAccess(segment.storageLevel, segment.maxEntrySize);
+ return new Active(access, new JournalSegmentWriter(access.newFileWriter(), segment, segment.journalIndex,
+ this));
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
private final AtomicInteger references = new AtomicInteger();
- private final JournalSegmentFile file;
- private final StorageLevel storageLevel;
+ private final @NonNull JournalSegmentFile file;
+ private final @NonNull StorageLevel storageLevel;
+ private final @NonNull JournalIndex journalIndex;
private final int maxEntrySize;
- private final JournalIndex journalIndex;
- private JournalSegmentWriter writer;
+ private State state;
private boolean open = true;
JournalSegment(
this.file = requireNonNull(file);
this.storageLevel = requireNonNull(storageLevel);
this.maxEntrySize = maxEntrySize;
- journalIndex = new SparseJournalIndex(indexDensity);
- final var fileWriter = switch (storageLevel) {
- case DISK -> new DiskFileWriter(file, maxEntrySize);
- case MAPPED -> new MappedFileWriter(file, maxEntrySize);
- };
+ journalIndex = new SparseJournalIndex(indexDensity);
- // Traverse all entries and push them to index -- thus reconstructing both last index and current position
- writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex,
- indexEntries(fileWriter, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null))
- // relinquish mapped memory
- .toFileChannel();
+ try (var tmpAccess = file.newAccess(storageLevel, maxEntrySize)) {
+ final var fileReader = tmpAccess.newFileReader();
+ state = new Inactive(indexEntries(fileReader, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null));
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
}
/**
/**
* Acquires a reference to the log segment.
*/
- private void acquire() {
- if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
- writer = writer.toMapped();
+ private Active acquire() {
+ return references.getAndIncrement() == 0 ? activate() : (Active) state;
+ }
+
+ private Active activate() {
+ final Active ret;
+ try {
+ ret = ((Inactive) state).activate(this);
+ } catch (IOException e) {
+ throw new StorageException(e);
}
+ state = ret;
+ return ret;
}
/**
*/
private void release() {
if (references.decrementAndGet() == 0) {
- if (storageLevel == StorageLevel.MAPPED) {
- writer = writer.toFileChannel();
- }
+ state = ((Active) state).deactivate();
if (!open) {
finishClose();
}
*/
JournalSegmentWriter acquireWriter() {
checkOpen();
- acquire();
-
- return writer;
+ return acquire().writer();
}
/**
*/
JournalSegmentReader createReader() {
checkOpen();
- acquire();
- final var buffer = writer.buffer();
- final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
- : new DiskFileReader(file, maxEntrySize);
- final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
+ final var reader = new JournalSegmentReader(this, acquire().access().newFileReader(), maxEntrySize);
reader.setPosition(JournalSegmentDescriptor.BYTES);
readers.add(reader);
return reader;
}
private void finishClose() {
- writer.close();
try {
file.close();
} catch (IOException e) {
.toString();
}
- static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
- final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
+ static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+ final long maxNextIndex, final @Nullable Position start) {
// acquire ownership of cache and make sure reader does not see anything we've done once we're done
final var fileReader = fileWriter.reader();
try {
- return indexEntries(fileReader, segment, maxEntrySize, journalIndex, maxNextIndex, start);
+ return indexEntries(fileReader, segment, fileWriter.maxEntrySize(), journalIndex, maxNextIndex, start);
} finally {
// Make sure reader does not see anything we've done
fileReader.invalidateCache();