/*
* Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
package io.atomix.storage.journal;
-import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
/**
- * Raft log writer.
+ * A {@link JournalWriter} backed by a {@link ByteBufWriter}.
*/
final class SegmentedJournalWriter<E> implements JournalWriter<E> {
- private final SegmentedJournal<E> journal;
- private JournalSegment currentSegment;
- private JournalSegmentWriter currentWriter;
+ private final ByteBufMapper<E> mapper;
+ private final ByteBufWriter writer;
- SegmentedJournalWriter(SegmentedJournal<E> journal) {
- this.journal = journal;
- this.currentSegment = journal.getLastSegment();
- this.currentWriter = currentSegment.acquireWriter();
- }
-
- @Override
- public long getLastIndex() {
- return currentWriter.getLastIndex();
- }
-
- @Override
- public long getNextIndex() {
- return currentWriter.getNextIndex();
- }
-
- @Override
- public void reset(long index) {
- if (index > currentSegment.firstIndex()) {
- currentSegment.releaseWriter();
- currentSegment = journal.resetSegments(index);
- currentWriter = currentSegment.acquireWriter();
- } else {
- truncate(index - 1);
+ SegmentedJournalWriter(final ByteBufWriter writer, final ByteBufMapper<E> mapper) {
+ this.writer = requireNonNull(writer);
+ this.mapper = requireNonNull(mapper);
}
- journal.resetHead(index);
- }
- @Override
- public void commit(long index) {
- if (index > journal.getCommitIndex()) {
- journal.setCommitIndex(index);
- if (journal.isFlushOnCommit()) {
- flush();
- }
+ @Override
+ public long getLastIndex() {
+ return writer.lastIndex();
}
- }
- @Override
- public <T extends E> Indexed<T> append(T entry) {
- final var bytes = journal.serializer().serialize(entry);
- var index = currentWriter.append(bytes);
- if (index != null) {
- return new Indexed<>(index, entry, bytes.readableBytes());
+ @Override
+ public long getNextIndex() {
+ return writer.nextIndex();
}
- // Slow path: we do not have enough capacity
- currentWriter.flush();
- currentSegment.releaseWriter();
- currentSegment = journal.getNextSegment();
- currentWriter = currentSegment.acquireWriter();
- final var newIndex = verifyNotNull(currentWriter.append(bytes));
- return new Indexed<>(newIndex, entry, bytes.readableBytes());
- }
-
- @Override
- public void truncate(long index) {
- if (index < journal.getCommitIndex()) {
- throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
+ @Override
+ public void reset(final long index) {
+ writer.reset(index);
}
- // Delete all segments with first indexes greater than the given index.
- while (index < currentSegment.firstIndex() && currentSegment != journal.getFirstSegment()) {
- currentSegment.releaseWriter();
- journal.removeSegment(currentSegment);
- currentSegment = journal.getLastSegment();
- currentWriter = currentSegment.acquireWriter();
+ @Override
+ public void commit(final long index) {
+ writer.commit(index);
}
- // Truncate the current index.
- currentWriter.truncate(index);
+ @Override
+ public <T extends E> Indexed<T> append(final T entry) {
+ final var buf = mapper.objectToBytes(entry);
+ return new Indexed<>(writer.append(buf), entry, buf.readableBytes());
+ }
- // Reset segment readers.
- journal.resetTail(index + 1);
- }
+ @Override
+ public void truncate(final long index) {
+ writer.truncate(index);
+ }
- @Override
- public void flush() {
- currentWriter.flush();
- }
+ @Override
+ public void flush() {
+ writer.flush();
+ }
}