import com.google.common.base.Stopwatch;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
void setFailure(final int index, final Exception cause) {
results.get(index).success(Optional.of(cause));
-
}
void setSuccess(final int index) {
}
}
- private final ArrayDeque<WrittenMessages> unflushedWrites = new ArrayDeque<>();
+ private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) {
+ UnflushedWrite {
+ requireNonNull(message);
+ requireNonNull(start);
+ }
+ }
+
+ private final ArrayDeque<UnflushedWrite> unflushedWrites = new ArrayDeque<>();
private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
private final long maxUnflushedBytes;
}
@Override
- void onWrittenMessages(final WrittenMessages message) {
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
boolean first = unflushedWrites.isEmpty();
if (first) {
unflushedDuration.start();
}
- unflushedWrites.addLast(message);
+ unflushedWrites.addLast(new UnflushedWrite(message, started, count));
unflushedBytes = unflushedBytes + message.writtenBytes;
if (unflushedBytes >= maxUnflushedBytes) {
LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
flushJournal(unflushedBytes, unsyncedSize);
final var sw = Stopwatch.createStarted();
- unflushedWrites.forEach(WrittenMessages::complete);
+ unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count));
unflushedWrites.clear();
unflushedBytes = 0;
unflushedDuration.reset();
}
@Override
- void onWrittenMessages(final WrittenMessages message) {
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
flushJournal(message.writtenBytes, 1);
- message.complete();
+ completeWriteMessages(message, started, count);
}
@Override
}
private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
- private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder()
- .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
- .build();
private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+ private static final FromByteBufMapper<Long> READ_MAPPER;
+ private static final ToByteBufMapper<Long> WRITE_MAPPER;
+
+ static {
+ final var namespace = JournalSerdes.builder()
+ .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
+ .build();
+
+ READ_MAPPER = namespace.toReadMapper();
+ WRITE_MAPPER = namespace.toWriteMapper();
+ }
private final String persistenceId;
private final StorageLevel storage;
private void handleWriteMessages(final WriteMessages message) {
ensureOpen();
- final var sw = Stopwatch.createStarted();
+ final var started = Stopwatch.createStarted();
final long start = dataJournal.lastWrittenSequenceNr();
final var writtenMessages = dataJournal.handleWriteMessages(message);
- sw.stop();
- batchWriteTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
- messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
+ onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start);
+ }
+ final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+ messageWriteCount.mark(count);
// log message after statistics are updated
- LOG.debug("{}: write of {} bytes completed in {}", persistenceId, writtenMessages.writtenBytes, sw);
- onWrittenMessages(writtenMessages);
+ LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started);
+ message.complete();
}
/**
* Handle a check of written messages.
*
* @param message Messages which were written
+ * @param started Stopwatch started when the write started
+ * @param count number of writes
*/
- abstract void onWrittenMessages(WrittenMessages message);
+ abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count);
private void handleUnknown(final Object message) {
LOG.error("{}: Received unknown message {}", persistenceId, message);
}
final var sw = Stopwatch.createStarted();
- deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
- .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
- final var lastEntry = deleteJournal.writer().getLastEntry();
- lastDelete = lastEntry == null ? 0 : lastEntry.entry();
+ deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
+ .withDirectory(directory)
+ .withName("delete")
+ .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
+ .build(), READ_MAPPER, WRITE_MAPPER);
+ final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
+ .tryNext((index, value, length) -> value);
+ lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
maxEntrySize, maxSegmentSize);