X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FReplicatedLogImpl.java;h=45b0b3898e20cc3b3a5acbe523944bbfc0ee6c6a;hb=refs%2Fchanges%2F11%2F80211%2F6;hp=dfaed9ba38c33fabfde2c2a8b894646ac1eedd2a;hpb=bef65394c7f540b601ce4bd360d7d7648f289bd1;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index dfaed9ba38..45b0b3898e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -7,31 +7,28 @@ */ package org.opendaylight.controller.cluster.raft; +import static java.util.Objects.requireNonNull; + import akka.japi.Procedure; -import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; -import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; +import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; /** * Implementation of ReplicatedLog used by the RaftActor. */ -class ReplicatedLogImpl extends AbstractReplicatedLogImpl { +final class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private static final int DATA_SIZE_DIVIDER = 5; - private final Procedure deleteProcedure = new Procedure() { - @Override - public void apply(final DeleteEntries notUsed) { - } - }; - private final RaftActorContext context; private long dataSizeSinceLastSnapshot = 0L; - private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List unAppliedEntries, + private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, + final List unAppliedEntries, final RaftActorContext context) { super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId()); - this.context = Preconditions.checkNotNull(context); + this.context = requireNonNull(context); } static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) { @@ -47,8 +44,8 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public boolean removeFromAndPersist(final long logEntryIndex) { // FIXME: Maybe this should be done after the command is saved long adjustedIndex = removeFrom(logEntryIndex); - if(adjustedIndex >= 0) { - context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure); + if (adjustedIndex >= 0) { + context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), NoopProcedure.instance()); return true; } @@ -56,17 +53,17 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } @Override - public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) { - appendAndPersist(replicatedLogEntry, null); + public boolean shouldCaptureSnapshot(final long logIndex) { + final ConfigParams config = context.getConfigParams(); + final long journalSize = logIndex + 1; + final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100; + + return journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold; } @Override public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) { - final ConfigParams config = context.getConfigParams(); - final long journalSize = replicatedLogEntry.getIndex() + 1; - final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100; - - if (journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold) { + if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) { boolean started = context.getSnapshotManager().capture(replicatedLogEntry, context.getCurrentBehavior().getReplicatedToAllIndex()); if (started && !context.hasFollowers()) { @@ -95,35 +92,31 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } @Override - public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, - final Procedure callback) { + public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, + final Procedure callback, final boolean doAsync) { context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry); - // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs - if(!append(replicatedLogEntry)) { - return; + if (!append(replicatedLogEntry)) { + return false; } - // When persisting events with persist it is guaranteed that the - // persistent actor will not receive further commands between the - // persist call and the execution(s) of the associated event - // handler. This also holds for multiple persist calls in context - // of a single command. - context.getPersistenceProvider().persist(replicatedLogEntry, - new Procedure() { - @Override - public void apply(final ReplicatedLogEntry param) throws Exception { - context.getLogger().debug("{}: persist complete {}", context.getId(), param); - - int logEntrySize = param.size(); - dataSizeSinceLastSnapshot += logEntrySize; - - if (callback != null) { - callback.apply(param); - } - } + Procedure persistCallback = persistedLogEntry -> { + context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry); + + dataSizeSinceLastSnapshot += persistedLogEntry.size(); + + if (callback != null) { + callback.apply(persistedLogEntry); } - ); + }; + + if (doAsync) { + context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback); + } else { + context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback); + } + + return true; } -} \ No newline at end of file +}