import java.util.function.BiFunction;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final boolean flushOnCommit;
private final @NonNull ByteBufWriter writer;
+ // null when closed
private JournalSegment currentSegment;
private volatile long commitIndex;
this.maxEntrySize = maxEntrySize;
this.indexDensity = indexDensity;
this.flushOnCommit = flushOnCommit;
- open();
+
+ // Load existing log segments from disk.
+ for (var segment : loadSegments()) {
+ segments.put(segment.firstIndex(), segment);
+ }
+ currentSegment = ensureLastSegment();
+
writer = new SegmentedByteBufWriter(this);
}
return openReader(index, SegmentedCommitsByteBufReader::new);
}
- /**
- * Opens the segments.
- */
- private synchronized void open() {
- // Load existing log segments from disk.
- for (var segment : loadSegments()) {
- segments.put(segment.firstIndex(), segment);
- }
- // If a segment doesn't already exist, create an initial segment starting at index 1.
- if (segments.isEmpty()) {
- currentSegment = createSegment(1, 1);
- segments.put(1L, currentSegment);
- } else {
- currentSegment = segments.lastEntry().getValue();
- }
- }
-
/**
* Asserts that the manager is open.
*
}
}
- /**
- * Resets the current segment, creating a new segment if necessary.
- */
- private synchronized void resetCurrentSegment() {
- final var lastSegment = lastSegment();
- if (lastSegment == null) {
- currentSegment = createSegment(1, 1);
- segments.put(1L, currentSegment);
- } else {
- currentSegment = lastSegment;
- }
- }
-
/**
* Resets and returns the first segment in the journal.
*
segments.values().forEach(JournalSegment::delete);
segments.clear();
-
- currentSegment = createSegment(1, index);
- segments.put(index, currentSegment);
- return currentSegment;
+ final var newSegment = createInitialSegment();
+ currentSegment = newSegment;
+ return newSegment;
}
/**
*/
JournalSegment firstSegment() {
assertOpen();
- final var firstEntry = segments.firstEntry();
- return firstEntry != null ? firstEntry.getValue() : nextSegment();
+ return segments.firstEntry().getValue();
}
/**
*/
JournalSegment lastSegment() {
assertOpen();
- final var lastEntry = segments.lastEntry();
- return lastEntry != null ? lastEntry.getValue() : nextSegment();
+ return segments.lastEntry().getValue();
+ }
+
+ /**
+ * Returns the segment following the segment with the given ID.
+ *
+ * @param index The segment index with which to look up the next segment.
+ * @return The next segment for the given index, or {@code null} if no such segment exists
+ */
+ @Nullable JournalSegment tryNextSegment(final long index) {
+ final var higherEntry = segments.higherEntry(index);
+ return higherEntry != null ? higherEntry.getValue() : null;
}
/**
* @return The next segment.
* @throws IllegalStateException if the segment manager is not open
*/
- synchronized JournalSegment nextSegment() {
+ synchronized @NonNull JournalSegment createNextSegment() {
assertOpen();
assertDiskSpace();
+ // FIXME: lastSegment should equal currentSegment. We should be asserting that.
final var index = currentSegment.lastIndex() + 1;
final var lastSegment = lastSegment();
- currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index);
- segments.put(index, currentSegment);
- return currentSegment;
- }
-
- /**
- * Returns the segment following the segment with the given ID.
- *
- * @param index The segment index with which to look up the next segment.
- * @return The next segment for the given index.
- */
- JournalSegment nextSegment(final long index) {
- final var higherEntry = segments.higherEntry(index);
- return higherEntry != null ? higherEntry.getValue() : null;
+ final var nextSegment = createSegment(lastSegment.file().segmentId() + 1, index);
+ segments.put(index, nextSegment);
+ currentSegment = nextSegment;
+ return nextSegment;
}
/**
synchronized void removeSegment(final JournalSegment segment) {
segments.remove(segment.firstIndex());
segment.delete();
- resetCurrentSegment();
+
+ // Reset current segment to last segment
+ currentSegment = ensureLastSegment();
}
/**
* Creates a new segment.
+ *
+ * @param segmentId the segment ID
+ * @param firstIndex index of first entry
+ * @param A new segment
*/
- JournalSegment createSegment(final long id, final long index) {
+ private @NonNull JournalSegment createSegment(final long segmentId, final long firstIndex) {
final JournalSegmentFile file;
try {
file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
- .withId(id)
- .withIndex(index)
+ .withId(segmentId)
+ .withIndex(firstIndex)
.withMaxSegmentSize(maxSegmentSize)
// FIXME: propagate maxEntries
.withMaxEntries(Integer.MAX_VALUE)
return segment;
}
+ private @NonNull JournalSegment createInitialSegment() {
+ final var segment = createSegment(1, 1);
+ segments.put(1L, segment);
+ return segment;
+ }
+
+ /**
+ * Make sure there is a last segment and return it.
+ *
+ * @return the last segment
+ */
+ private JournalSegment ensureLastSegment() {
+ final var lastEntry = segments.lastEntry();
+ // if there is no segment, create an initial segment starting at index 1.
+ return lastEntry != null ? lastEntry.getValue() : createInitialSegment();
+ }
+
/**
* Loads all segments from disk.
*
* @return A collection of segments for the log.
*/
- protected Collection<JournalSegment> loadSegments() {
+ private Collection<JournalSegment> loadSegments() {
// Ensure log directories are created.
directory.mkdirs();