X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FReplicatedLogImpl.java;h=7e1cb69afff8124d8cd794b3d68aec0f019d1e1b;hb=b8c6400766f7324dd57d059bd48e435569fe1a27;hp=ab1e23bb94d8a7a9790d3319a8ba977bef1baa36;hpb=214ba02ca4400d88e494fa27a44c30531f68968e;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index ab1e23bb94..7e1cb69aff 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -8,10 +8,10 @@ package org.opendaylight.controller.cluster.raft; import akka.japi.Procedure; +import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; -import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; /** * Implementation of ReplicatedLog used by the RaftActor. @@ -19,36 +19,32 @@ import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private static final int DATA_SIZE_DIVIDER = 5; - private long dataSizeSinceLastSnapshot = 0L; - private final RaftActorContext context; - private final RaftActorBehavior currentBehavior; - private final Procedure deleteProcedure = new Procedure() { @Override - public void apply(DeleteEntries notUsed) { + public void apply(final DeleteEntries notUsed) { } }; - static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context, - RaftActorBehavior currentBehavior) { - return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), - snapshot.getUnAppliedEntries(), context, currentBehavior); + private final RaftActorContext context; + private long dataSizeSinceLastSnapshot = 0L; + + private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List unAppliedEntries, + final RaftActorContext context) { + super(snapshotIndex, snapshotTerm, unAppliedEntries); + this.context = Preconditions.checkNotNull(context); } - static ReplicatedLog newInstance(RaftActorContext context, RaftActorBehavior currentBehavior) { - return new ReplicatedLogImpl(-1L, -1L, Collections.emptyList(), context, - currentBehavior); + static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) { + return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), + snapshot.getUnAppliedEntries(), context); } - private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List unAppliedEntries, - RaftActorContext context, RaftActorBehavior currentBehavior) { - super(snapshotIndex, snapshotTerm, unAppliedEntries); - this.context = context; - this.currentBehavior = currentBehavior; + static ReplicatedLog newInstance(final RaftActorContext context) { + return new ReplicatedLogImpl(-1L, -1L, Collections.emptyList(), context); } @Override - public void removeFromAndPersist(long logEntryIndex) { + public void removeFromAndPersist(final long logEntryIndex) { // FIXME: Maybe this should be done after the command is saved long adjustedIndex = removeFrom(logEntryIndex); if(adjustedIndex >= 0) { @@ -62,26 +58,21 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } @Override - public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) { - long journalSize = replicatedLogEntry.getIndex() + 1; - long dataThreshold = context.getTotalMemory() * - context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - - if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 - || getDataSizeForSnapshotCheck() > dataThreshold)) { + public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) { + final ConfigParams config = context.getConfigParams(); + final long journalSize = replicatedLogEntry.getIndex() + 1; + final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100; + if (journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold) { boolean started = context.getSnapshotManager().capture(replicatedLogEntry, - currentBehavior.getReplicatedToAllIndex()); - if (started) { - if (!context.hasFollowers()) { - dataSizeSinceLastSnapshot = 0; - } + context.getCurrentBehavior().getReplicatedToAllIndex()); + if (started && !context.hasFollowers()) { + dataSizeSinceLastSnapshot = 0; } } } private long getDataSizeForSnapshotCheck() { - long dataSizeForCheck = dataSize(); if (!context.hasFollowers()) { // When we do not have followers we do not maintain an in-memory log // due to this the journalSize will never become anything close to the @@ -94,18 +85,17 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { // need of doing a snapshot just for the sake of freeing up memory we adjust // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often // as if we were maintaining a real snapshot - dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; + return dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; + } else { + return dataSize(); } - return dataSizeForCheck; } @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, final Procedure callback) { - if(context.getLogger().isDebugEnabled()) { - context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry); - } + context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry); // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs append(replicatedLogEntry); @@ -118,14 +108,14 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { context.getPersistenceProvider().persist(replicatedLogEntry, new Procedure() { @Override - public void apply(ReplicatedLogEntry evt) throws Exception { - context.getLogger().debug("{}: persist complete {}", context.getId(), replicatedLogEntry); + public void apply(final ReplicatedLogEntry param) throws Exception { + context.getLogger().debug("{}: persist complete {}", context.getId(), param); - int logEntrySize = replicatedLogEntry.size(); + int logEntrySize = param.size(); dataSizeSinceLastSnapshot += logEntrySize; - if (callback != null){ - callback.apply(replicatedLogEntry); + if (callback != null) { + callback.apply(param); } } }