- private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
- private static final int SEGMENT_BUFFER_FACTOR = 3;
-
- private final String name;
- private final StorageLevel storageLevel;
- private final File directory;
- private final JournalSerializer<E> serializer;
- private final int maxSegmentSize;
- private final int maxEntrySize;
- private final int maxEntriesPerSegment;
- private final double indexDensity;
- private final boolean flushOnCommit;
- private final SegmentedJournalWriter<E> writer;
- private volatile long commitIndex;
-
- private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
- private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
- private JournalSegment currentSegment;
-
- private volatile boolean open = true;
-
- public SegmentedJournal(
- String name,
- StorageLevel storageLevel,
- File directory,
- JournalSerdes namespace,
- int maxSegmentSize,
- int maxEntrySize,
- int maxEntriesPerSegment,
- double indexDensity,
- boolean flushOnCommit) {
- this.name = requireNonNull(name, "name cannot be null");
- this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
- this.directory = requireNonNull(directory, "directory cannot be null");
- this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
- this.maxSegmentSize = maxSegmentSize;
- this.maxEntrySize = maxEntrySize;
- this.maxEntriesPerSegment = maxEntriesPerSegment;
- this.indexDensity = indexDensity;
- this.flushOnCommit = flushOnCommit;
- open();
- this.writer = new SegmentedJournalWriter<>(this);
- }
-
- /**
- * Returns the segment file name prefix.
- *
- * @return The segment file name prefix.
- */
- public String name() {
- return name;
- }
-
- /**
- * Returns the storage directory.
- * <p>
- * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
- * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
- * when the log is opened.
- *
- * @return The storage directory.
- */
- public File directory() {
- return directory;
- }
-
- /**
- * Returns the storage level.
- * <p>
- * The storage level dictates how entries within individual journal segments should be stored.
- *
- * @return The storage level.
- */
- public StorageLevel storageLevel() {
- return storageLevel;
- }
-
- /**
- * Returns the maximum journal segment size.
- * <p>
- * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
- *
- * @return The maximum segment size in bytes.
- */
- public int maxSegmentSize() {
- return maxSegmentSize;
- }
-
- /**
- * Returns the maximum journal entry size.
- * <p>
- * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
- *
- * @return the maximum entry size in bytes
- */
- public int maxEntrySize() {
- return maxEntrySize;
- }
-
- /**
- * Returns the maximum number of entries per segment.
- * <p>
- * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
- * in a journal.
- *
- * @return The maximum number of entries per segment.
- * @deprecated since 3.0.2
- */
- @Deprecated
- public int maxEntriesPerSegment() {
- return maxEntriesPerSegment;
- }
-
- /**
- * Returns the collection of journal segments.
- *
- * @return the collection of journal segments
- */
- public Collection<JournalSegment> segments() {
- return segments.values();
- }
-
- /**
- * Returns the collection of journal segments with indexes greater than the given index.
- *
- * @param index the starting index
- * @return the journal segments starting with indexes greater than or equal to the given index
- */
- public Collection<JournalSegment> segments(long index) {
- return segments.tailMap(index).values();
- }
-
- /**
- * Returns serializer instance.
- *
- * @return serializer instance
- */
- JournalSerializer<E> serializer() {
- return serializer;
- }
-
- /**
- * Returns the total size of the journal.
- *
- * @return the total size of the journal
- */
- public long size() {
- return segments.values().stream()
- .mapToLong(JournalSegment::size)
- .sum();
- }
-
- @Override
- public JournalWriter<E> writer() {
- return writer;
- }
-
- @Override
- public JournalReader<E> openReader(long index) {
- return openReader(index, JournalReader.Mode.ALL);
- }
-
- /**
- * Opens a new Raft log reader with the given reader mode.
- *
- * @param index The index from which to begin reading entries.
- * @param mode The mode in which to read entries.
- * @return The Raft log reader.
- */
- @Override
- public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
- final var segment = getSegment(index);
- final var reader = switch (mode) {
- case ALL -> new SegmentedJournalReader<>(this, segment);
- case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
- };
-
- // Forward reader to specified index
- long next = reader.getNextIndex();
- while (index > next && reader.tryAdvance()) {
- next = reader.getNextIndex();
- }
-
- readers.add(reader);
- return reader;
- }
-
- /**
- * Opens the segments.
- */
- private synchronized void open() {
- // Load existing log segments from disk.
- for (JournalSegment segment : loadSegments()) {
- segments.put(segment.descriptor().index(), segment);
- }
-
- // If a segment doesn't already exist, create an initial segment starting at index 1.
- if (!segments.isEmpty()) {
- currentSegment = segments.lastEntry().getValue();
- } else {
- currentSegment = createSegment(1, 1);
- segments.put(1L, currentSegment);
- }
- }
-
- /**
- * Asserts that the manager is open.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- private void assertOpen() {
- checkState(currentSegment != null, "journal not open");
- }
-
- /**
- * Asserts that enough disk space is available to allocate a new segment.
- */
- private void assertDiskSpace() {
- if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
- throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
- }
- }
-
- /**
- * Resets the current segment, creating a new segment if necessary.
- */
- private synchronized void resetCurrentSegment() {
- final var lastSegment = getLastSegment();
- if (lastSegment == null) {
- currentSegment = createSegment(1, 1);
- segments.put(1L, currentSegment);
- } else {
- currentSegment = lastSegment;
- }
- }
-
- /**
- * Resets and returns the first segment in the journal.
- *
- * @param index the starting index of the journal
- * @return the first segment
- */
- JournalSegment resetSegments(long index) {
- assertOpen();
-
- // If the index already equals the first segment index, skip the reset.
- JournalSegment firstSegment = getFirstSegment();
- if (index == firstSegment.firstIndex()) {
- return firstSegment;
- }
-
- for (JournalSegment segment : segments.values()) {
- segment.close();
- segment.delete();
- }
- segments.clear();
-
- currentSegment = createSegment(1, index);
- segments.put(index, currentSegment);
- return currentSegment;
- }
-
- /**
- * Returns the first segment in the log.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- JournalSegment getFirstSegment() {
- assertOpen();
- Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
- return segment != null ? segment.getValue() : null;
- }
-
- /**
- * Returns the last segment in the log.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- JournalSegment getLastSegment() {
- assertOpen();
- Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
- return segment != null ? segment.getValue() : null;
- }
-
- /**
- * Creates and returns the next segment.
- *
- * @return The next segment.
- * @throws IllegalStateException if the segment manager is not open
- */
- synchronized JournalSegment getNextSegment() {
- assertOpen();
- assertDiskSpace();
-
- final var index = currentSegment.lastIndex() + 1;
- final var lastSegment = getLastSegment();
- currentSegment = createSegment(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1, index);
- segments.put(index, currentSegment);
- return currentSegment;
- }
-
- /**
- * Returns the segment following the segment with the given ID.
- *
- * @param index The segment index with which to look up the next segment.
- * @return The next segment for the given index.
- */
- JournalSegment getNextSegment(long index) {
- Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
- return nextSegment != null ? nextSegment.getValue() : null;
- }
-
- /**
- * Returns the segment for the given index.
- *
- * @param index The index for which to return the segment.
- * @throws IllegalStateException if the segment manager is not open
- */
- synchronized JournalSegment getSegment(long index) {
- assertOpen();
- // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
- if (currentSegment != null && index > currentSegment.firstIndex()) {
- return currentSegment;
- }
-
- // If the index is in another segment, get the entry with the next lowest first index.
- Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
- if (segment != null) {
- return segment.getValue();
- }
- return getFirstSegment();
- }
-
- /**
- * Removes a segment.
- *
- * @param segment The segment to remove.
- */
- synchronized void removeSegment(JournalSegment segment) {
- segments.remove(segment.firstIndex());
- segment.close();
- segment.delete();
- resetCurrentSegment();
- }
-
- /**
- * Creates a new segment.
- */
- JournalSegment createSegment(long id, long index) {
- final var segmentFile = JournalSegmentFile.createSegmentFile(name, directory, id);
- final var descriptor = JournalSegmentDescriptor.builder()
- .withId(id)
- .withIndex(index)
- .withMaxSegmentSize(maxSegmentSize)
- .withMaxEntries(maxEntriesPerSegment)
- .withUpdated(System.currentTimeMillis())
- .build();
-
- try (var raf = new RandomAccessFile(segmentFile, "rw")) {
- raf.setLength(maxSegmentSize);
- raf.write(descriptor.toArray());
- } catch (IOException e) {
- throw new StorageException(e);
- }
-
- final var segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
- LOG.debug("Created segment: {}", segment);
- return segment;
- }
-
- /**
- * Creates a new segment instance.
- *
- * @param segmentFile The segment file.
- * @param descriptor The segment descriptor.
- * @return The segment instance.
- */
- protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
- return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
- }
-
- /**
- * Loads all segments from disk.
- *
- * @return A collection of segments for the log.
- */
- protected Collection<JournalSegment> loadSegments() {
- // Ensure log directories are created.
- directory.mkdirs();
-
- final var segments = new TreeMap<Long, JournalSegment>();
-
- // Iterate through all files in the log directory.
- for (var file : directory.listFiles(File::isFile)) {
-
- // If the file looks like a segment file, attempt to load the segment.
- if (JournalSegmentFile.isSegmentFile(name, file)) {
- // read the descriptor
- final JournalSegmentDescriptor descriptor;
- try {
- descriptor = JournalSegmentDescriptor.readFrom(file.toPath());
- } catch (IOException e) {
- throw new StorageException(e);
- }
-
- // Load the segment.
- final var segmentFile = new JournalSegmentFile(file);
- final var segment = newSegment(segmentFile, descriptor);
- LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName());
-
- // Add the segment to the segments list.
- segments.put(segment.firstIndex(), segment);
- }
- }
-
- // Verify that all the segments in the log align with one another.
- JournalSegment previousSegment = null;
- boolean corrupted = false;
- final var iterator = segments.entrySet().iterator();
- while (iterator.hasNext()) {
- final var segment = iterator.next().getValue();
- if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
- LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(),
- previousSegment.file().file());
- corrupted = true;
- }
- if (corrupted) {
- segment.close();
- segment.delete();
- iterator.remove();
- }
- previousSegment = segment;
- }
-
- return segments.values();
- }
-
- /**
- * Resets journal readers to the given head.
- *
- * @param index The index at which to reset readers.
- */
- void resetHead(long index) {
- for (var reader : readers) {
- if (reader.getNextIndex() < index) {
- reader.reset(index);
- }