X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FReplicatedLogImpl.java;h=90042907a5f79e2d13d590af19d3782a8a269548;hb=3676d1686706dbee6656e86a23c4bdb516d5267b;hp=5a77b9aea3ac4b008af17e760ac5a0f53349220e;hpb=f39ec0eea4ce3f2a9be935887097a7e974adf5e0;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index 5a77b9aea3..90042907a5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -8,11 +8,10 @@ package org.opendaylight.controller.cluster.raft; import akka.japi.Procedure; +import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; -import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries; -import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; /** * Implementation of ReplicatedLog used by the RaftActor. @@ -22,41 +21,34 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private long dataSizeSinceLastSnapshot = 0L; private final RaftActorContext context; - private final DataPersistenceProvider persistence; - private final RaftActorBehavior currentBehavior; private final Procedure deleteProcedure = new Procedure() { @Override - public void apply(DeleteEntries param) { + public void apply(final DeleteEntries notUsed) { } }; - static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context, - DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) { + static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) { return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), - snapshot.getUnAppliedEntries(), context, persistence, currentBehavior); + snapshot.getUnAppliedEntries(), context); } - static ReplicatedLog newInstance(RaftActorContext context, - DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) { - return new ReplicatedLogImpl(-1L, -1L, Collections.emptyList(), context, - persistence, currentBehavior); + static ReplicatedLog newInstance(final RaftActorContext context) { + return new ReplicatedLogImpl(-1L, -1L, Collections.emptyList(), context); } - private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List unAppliedEntries, - RaftActorContext context, DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) { + private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List unAppliedEntries, + final RaftActorContext context) { super(snapshotIndex, snapshotTerm, unAppliedEntries); - this.context = context; - this.persistence = persistence; - this.currentBehavior = currentBehavior; + this.context = Preconditions.checkNotNull(context); } @Override - public void removeFromAndPersist(long logEntryIndex) { + public void removeFromAndPersist(final long logEntryIndex) { // FIXME: Maybe this should be done after the command is saved long adjustedIndex = removeFrom(logEntryIndex); if(adjustedIndex >= 0) { - persistence.persist(new DeleteEntries((int)adjustedIndex), deleteProcedure); + context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure); } } @@ -65,11 +57,49 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { appendAndPersist(replicatedLogEntry, null); } + @Override + public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) { + long journalSize = replicatedLogEntry.getIndex() + 1; + long dataThreshold = context.getTotalMemory() * + context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; + + if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 + || getDataSizeForSnapshotCheck() > dataThreshold)) { + + boolean started = context.getSnapshotManager().capture(replicatedLogEntry, + context.getCurrentBehavior().getReplicatedToAllIndex()); + if (started) { + if (!context.hasFollowers()) { + dataSizeSinceLastSnapshot = 0; + } + } + } + } + + private long getDataSizeForSnapshotCheck() { + long dataSizeForCheck = dataSize(); + if (!context.hasFollowers()) { + // When we do not have followers we do not maintain an in-memory log + // due to this the journalSize will never become anything close to the + // snapshot batch count. In fact will mostly be 1. + // Similarly since the journal's dataSize depends on the entries in the + // journal the journal's dataSize will never reach a value close to the + // memory threshold. + // By maintaining the dataSize outside the journal we are tracking essentially + // what we have written to the disk however since we no longer are in + // need of doing a snapshot just for the sake of freeing up memory we adjust + // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often + // as if we were maintaining a real snapshot + dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; + } + return dataSizeForCheck; + } + @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, final Procedure callback) { - if(context.getLogger().isDebugEnabled()) { + if (context.getLogger().isDebugEnabled()) { context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry); } @@ -81,47 +111,17 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { // persist call and the execution(s) of the associated event // handler. This also holds for multiple persist calls in context // of a single command. - persistence.persist(replicatedLogEntry, + context.getPersistenceProvider().persist(replicatedLogEntry, new Procedure() { @Override - public void apply(ReplicatedLogEntry evt) throws Exception { - int logEntrySize = replicatedLogEntry.size(); - - long dataSizeForCheck = dataSize(); + public void apply(final ReplicatedLogEntry param) throws Exception { + context.getLogger().debug("{}: persist complete {}", context.getId(), param); + int logEntrySize = param.size(); dataSizeSinceLastSnapshot += logEntrySize; - if (!context.hasFollowers()) { - // When we do not have followers we do not maintain an in-memory log - // due to this the journalSize will never become anything close to the - // snapshot batch count. In fact will mostly be 1. - // Similarly since the journal's dataSize depends on the entries in the - // journal the journal's dataSize will never reach a value close to the - // memory threshold. - // By maintaining the dataSize outside the journal we are tracking essentially - // what we have written to the disk however since we no longer are in - // need of doing a snapshot just for the sake of freeing up memory we adjust - // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often - // as if we were maintaining a real snapshot - dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; - } - long journalSize = replicatedLogEntry.getIndex() + 1; - long dataThreshold = context.getTotalMemory() * - context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - - if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 - || dataSizeForCheck > dataThreshold)) { - - boolean started = context.getSnapshotManager().capture(replicatedLogEntry, - currentBehavior.getReplicatedToAllIndex()); - - if(started){ - dataSizeSinceLastSnapshot = 0; - } - } - - if (callback != null){ - callback.apply(replicatedLogEntry); + if (callback != null) { + callback.apply(param); } } }