*/
package org.opendaylight.controller.akka.segjournal;
-import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
import io.atomix.storage.StorageLevel;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalReader;
import io.atomix.storage.journal.SegmentedJournalWriter;
import io.atomix.utils.serializer.Namespace;
import java.io.File;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.Iterator;
-import scala.collection.SeqLike;
import scala.concurrent.Future;
import scala.concurrent.Promise;
}
}
- private static final class ReplayMessages extends AsyncMessage<Void> {
+ static final class ReplayMessages extends AsyncMessage<Void> {
private final long fromSequenceNr;
- private final long toSequenceNr;
- private final long max;
- private final Consumer<PersistentRepr> replayCallback;
+ final long toSequenceNr;
+ final long max;
+ final Consumer<PersistentRepr> replayCallback;
ReplayMessages(final long fromSequenceNr,
final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
return promise.future();
}
+ int size() {
+ return requests.size();
+ }
+
+ AtomicWrite getRequest(final int index) {
+ return requests.get(index);
+ }
+
+ void setFailure(final int index, final Exception cause) {
+ results.get(index).success(Optional.of(cause));
+
+ }
+
+ void setSuccess(final int index) {
+ results.get(index).success(Optional.empty());
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("requests", requests).toString();
// Tracks the size distribution of messages
private Histogram messageSize;
- private SegmentedJournal<DataJournalEntry> dataJournal;
+ private DataJournal dataJournal;
private SegmentedJournal<Long> deleteJournal;
private long lastDelete;
- // Tracks largest message size we have observed either during recovery or during write
- private int largestObservedSize;
-
SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
final int maxEntrySize, final int maxSegmentSize) {
this.persistenceId = requireNonNull(persistenceId);
ensureOpen();
LOG.debug("{}: delete messages {}", persistenceId, message);
- final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr);
+ final long to = Long.min(dataJournal.lastWrittenIndex(), message.toSequenceNr);
LOG.debug("{}: adjusted delete to {}", persistenceId, to);
if (lastDelete < to) {
final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
final Indexed<Long> entry = deleteWriter.append(lastDelete);
deleteWriter.commit(entry.index());
- dataJournal.writer().commit(lastDelete);
+ dataJournal.commitTo(lastDelete);
LOG.debug("{}: compaction started", persistenceId);
- dataJournal.compact(lastDelete + 1);
+ dataJournal.compactTo(lastDelete + 1);
deleteJournal.compact(entry.index());
LOG.debug("{}: compaction finished", persistenceId);
} else {
message.promise.success(null);
}
- @SuppressWarnings("checkstyle:illegalCatch")
private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) {
LOG.debug("{}: looking for highest sequence on {}", persistenceId, message);
final Long sequence;
if (directory.isDirectory()) {
ensureOpen();
- sequence = dataJournal.writer().getLastIndex();
+ sequence = dataJournal.lastWrittenIndex();
} else {
sequence = 0L;
}
final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
- try (SegmentedJournalReader<DataJournalEntry> reader = dataJournal.openReader(from)) {
- int count = 0;
- while (reader.hasNext() && count < message.max) {
- final Indexed<DataJournalEntry> next = reader.next();
- if (next.index() > message.toSequenceNr) {
- break;
- }
-
- LOG.trace("{}: replay {}", persistenceId, next);
- updateLargestSize(next.size());
- final DataJournalEntry entry = next.entry();
- verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
-
- final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
- LOG.debug("{}: replaying {}", persistenceId, repr);
- message.replayCallback.accept(repr);
- count++;
- }
- LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
- } catch (Exception e) {
- LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
- message.promise.failure(e);
- } finally {
- message.promise.success(null);
- }
+ dataJournal.handleReplayMessages(message, from);
}
- @SuppressWarnings("checkstyle:illegalCatch")
private void handleWriteMessages(final WriteMessages message) {
ensureOpen();
- final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
final long startTicks = System.nanoTime();
- final int count = message.requests.size();
- final long start = writer.getLastIndex();
-
- for (int i = 0; i < count; ++i) {
- final long mark = writer.getLastIndex();
- try {
- writeRequest(writer, message.requests.get(i));
- } catch (Exception e) {
- LOG.warn("{}: failed to write out request", persistenceId, e);
- message.results.get(i).success(Optional.of(e));
- writer.truncate(mark);
- continue;
- }
-
- message.results.get(i).success(Optional.empty());
- }
- writer.flush();
- batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
- messageWriteCount.mark(writer.getLastIndex() - start);
- }
+ final long start = dataJournal.lastWrittenIndex();
- private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
- // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276
- final Iterator<PersistentRepr> it = ((SeqLike<PersistentRepr, ?>) request.payload()).iterator();
- while (it.hasNext()) {
- final PersistentRepr repr = it.next();
- final Object payload = repr.payload();
- if (!(payload instanceof Serializable)) {
- throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
- }
-
- final int size = writer.append(new ToPersistence(repr)).size();
- messageSize.update(size);
- updateLargestSize(size);
- }
+ dataJournal.handleWriteMessages(message);
+
+ batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
+ messageWriteCount.mark(dataJournal.lastWrittenIndex() - start);
}
private void handleUnknown(final Object message) {
LOG.error("{}: Received unknown message {}", persistenceId, message);
}
- private void updateLargestSize(final int size) {
- if (size > largestObservedSize) {
- largestObservedSize = size;
- }
- }
-
private void ensureOpen() {
if (dataJournal != null) {
verifyNotNull(deleteJournal);
final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
lastDelete = lastEntry == null ? 0 : lastEntry.entry();
- dataJournal = SegmentedJournal.<DataJournalEntry>builder()
- .withStorageLevel(storage).withDirectory(directory).withName("data")
- .withNamespace(Namespace.builder()
- .register(new DataJournalEntrySerializer(context().system()),
- FromPersistence.class, ToPersistence.class)
- .build())
- .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
- .build();
- final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
- writer.commit(lastDelete);
- LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(),
+ dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
+ maxEntrySize, maxSegmentSize);
+ dataJournal.commitTo(lastDelete);
+ LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, dataJournal.lastWrittenIndex(),
lastDelete);
}
}