- private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
- private static final int SEGMENT_BUFFER_FACTOR = 3;
-
- private final String name;
- private final StorageLevel storageLevel;
- private final File directory;
- private final JournalSerializer<E> serializer;
- private final int maxSegmentSize;
- private final int maxEntrySize;
- private final int maxEntriesPerSegment;
- private final double indexDensity;
- private final boolean flushOnCommit;
- private final SegmentedJournalWriter<E> writer;
- private volatile long commitIndex;
-
- private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
- private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
- private JournalSegment currentSegment;
-
- private volatile boolean open = true;
-
- public SegmentedJournal(
- String name,
- StorageLevel storageLevel,
- File directory,
- JournalSerdes namespace,
- int maxSegmentSize,
- int maxEntrySize,
- int maxEntriesPerSegment,
- double indexDensity,
- boolean flushOnCommit) {
- this.name = requireNonNull(name, "name cannot be null");
- this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
- this.directory = requireNonNull(directory, "directory cannot be null");
- this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
- this.maxSegmentSize = maxSegmentSize;
- this.maxEntrySize = maxEntrySize;
- this.maxEntriesPerSegment = maxEntriesPerSegment;
- this.indexDensity = indexDensity;
- this.flushOnCommit = flushOnCommit;
- open();
- this.writer = new SegmentedJournalWriter<>(this);
- }
-
- /**
- * Returns the segment file name prefix.
- *
- * @return The segment file name prefix.
- */
- public String name() {
- return name;
- }
-
- /**
- * Returns the storage directory.
- * <p>
- * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
- * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
- * when the log is opened.
- *
- * @return The storage directory.
- */
- public File directory() {
- return directory;
- }
-
- /**
- * Returns the storage level.
- * <p>
- * The storage level dictates how entries within individual journal segments should be stored.
- *
- * @return The storage level.
- */
- public StorageLevel storageLevel() {
- return storageLevel;
- }
-
- /**
- * Returns the maximum journal segment size.
- * <p>
- * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
- *
- * @return The maximum segment size in bytes.
- */
- public int maxSegmentSize() {
- return maxSegmentSize;
- }
-
- /**
- * Returns the maximum journal entry size.
- * <p>
- * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
- *
- * @return the maximum entry size in bytes
- */
- public int maxEntrySize() {
- return maxEntrySize;
- }
-
- /**
- * Returns the maximum number of entries per segment.
- * <p>
- * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
- * in a journal.
- *
- * @return The maximum number of entries per segment.
- * @deprecated since 3.0.2
- */
- @Deprecated
- public int maxEntriesPerSegment() {
- return maxEntriesPerSegment;
- }
-
- /**
- * Returns the collection of journal segments.
- *
- * @return the collection of journal segments
- */
- public Collection<JournalSegment> segments() {
- return segments.values();
- }
-
- /**
- * Returns the collection of journal segments with indexes greater than the given index.
- *
- * @param index the starting index
- * @return the journal segments starting with indexes greater than or equal to the given index
- */
- public Collection<JournalSegment> segments(long index) {
- return segments.tailMap(index).values();
- }
-
- /**
- * Returns serializer instance.
- *
- * @return serializer instance
- */
- JournalSerializer<E> serializer() {
- return serializer;
- }
-
- /**
- * Returns the total size of the journal.
- *
- * @return the total size of the journal
- */
- public long size() {
- return segments.values().stream()
- .mapToLong(segment -> {
- try {
- return segment.file().size();
- } catch (IOException e) {
- throw new StorageException(e);
- }
- })
- .sum();
- }
-
- @Override
- public JournalWriter<E> writer() {
- return writer;
- }
-
- @Override
- public JournalReader<E> openReader(long index) {
- return openReader(index, JournalReader.Mode.ALL);
- }
-
- /**
- * Opens a new Raft log reader with the given reader mode.
- *
- * @param index The index from which to begin reading entries.
- * @param mode The mode in which to read entries.
- * @return The Raft log reader.
- */
- @Override
- public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
- final var segment = getSegment(index);
- final var reader = switch (mode) {
- case ALL -> new SegmentedJournalReader<>(this, segment);
- case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
- };
-
- // Forward reader to specified index
- long next = reader.getNextIndex();
- while (index > next && reader.tryAdvance()) {
- next = reader.getNextIndex();
- }
-
- readers.add(reader);
- return reader;
- }
-
- /**
- * Opens the segments.
- */
- private synchronized void open() {
- // Load existing log segments from disk.
- for (var segment : loadSegments()) {
- segments.put(segment.firstIndex(), segment);
- }
-
- // If a segment doesn't already exist, create an initial segment starting at index 1.
- if (!segments.isEmpty()) {
- currentSegment = segments.lastEntry().getValue();
- } else {
- currentSegment = createSegment(1, 1);
- segments.put(1L, currentSegment);
- }
- }
-
- /**
- * Asserts that the manager is open.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- private void assertOpen() {
- checkState(currentSegment != null, "journal not open");
- }
-
- /**
- * Asserts that enough disk space is available to allocate a new segment.
- */
- private void assertDiskSpace() {
- if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
- throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
- }
- }
-
- /**
- * Resets the current segment, creating a new segment if necessary.
- */
- private synchronized void resetCurrentSegment() {
- final var lastSegment = getLastSegment();
- if (lastSegment == null) {
- currentSegment = createSegment(1, 1);
- segments.put(1L, currentSegment);
- } else {
- currentSegment = lastSegment;
- }
- }
-
- /**
- * Resets and returns the first segment in the journal.
- *
- * @param index the starting index of the journal
- * @return the first segment
- */
- JournalSegment resetSegments(long index) {
- assertOpen();
-
- // If the index already equals the first segment index, skip the reset.
- final var firstSegment = getFirstSegment();
- if (index == firstSegment.firstIndex()) {
- return firstSegment;