X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FSnapshotManager.java;h=4c3fb109add982c9f2d5590d524d20dfbf7ea7b7;hb=4c0cc4831e7ecc2d5f745a42ba7a390361432fd2;hp=f1881f5b0f228a62ee4e6e45a03d07c4b0a5e368;hpb=55e018bfad0c70b773641142d6fbf009cd67fda4;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index f1881f5b0f..4c3fb109ad 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft; import akka.japi.Procedure; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; import java.util.List; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -115,7 +114,7 @@ public class SnapshotManager implements SnapshotState { @Override public boolean isCapturing() { - return false; + return true; } @Override @@ -189,6 +188,11 @@ public class SnapshotManager implements SnapshotState { private class Idle extends AbstractSnapshotState { + @Override + public boolean isCapturing() { + return false; + } + private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { TermInformationReader lastAppliedTermInfoReader = lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), @@ -207,8 +211,18 @@ public class SnapshotManager implements SnapshotState { List unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1); - captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(), - lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm, + long lastLogEntryIndex = lastAppliedIndex; + long lastLogEntryTerm = lastAppliedTerm; + if(lastLogEntry != null) { + lastLogEntryIndex = lastLogEntry.getIndex(); + lastLogEntryTerm = lastLogEntry.getTerm(); + } else { + LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.", + lastAppliedIndex, lastAppliedTerm); + } + + captureSnapshot = new CaptureSnapshot(lastLogEntryIndex, + lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm, newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null); if(captureSnapshot.isInstallSnapshotInitiated()) { @@ -271,38 +285,44 @@ public class SnapshotManager implements SnapshotState { private class Creating extends AbstractSnapshotState { - @Override - public boolean isCapturing() { - return true; - } - @Override public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) { // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. - Snapshot sn = Snapshot.create(snapshotBytes, + Snapshot snapshot = Snapshot.create(snapshotBytes, captureSnapshot.getUnAppliedEntries(), captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - context.getPersistenceProvider().saveSnapshot(sn); + context.getPersistenceProvider().saveSnapshot(snapshot); - LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); + LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot.getLogMessage()); long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - if (context.getReplicatedLog().dataSize() > dataThreshold) { + boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold; + boolean logSizeExceededSnapshotBatchCount = + context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount(); +LOG.debug("Log size: {}, getSnapshotBatchCount: {}",context.getReplicatedLog().size(),context.getConfigParams().getSnapshotBatchCount()); + if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) { if(LOG.isDebugEnabled()) { - LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}", - persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold, - captureSnapshot.getLastAppliedIndex()); + if(dataSizeThresholdExceeded) { + LOG.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}", + context.getId(), context.getReplicatedLog().dataSize(), dataThreshold, + captureSnapshot.getLastAppliedIndex()); + } else { + LOG.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}", + context.getId(), context.getReplicatedLog().size(), + context.getConfigParams().getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex()); + } } - // if memory is less, clear the log based on lastApplied. - // this could/should only happen if one of the followers is down - // as normally we keep removing from the log when its replicated to all. + // We either exceeded the memory threshold or the log size exceeded the snapshot batch + // count so, to keep the log memory footprint in check, clear the log based on lastApplied. + // This could/should only happen if one of the followers is down as normally we keep + // removing from the log as entries are replicated to all. context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); @@ -327,15 +347,14 @@ public class SnapshotManager implements SnapshotState { context.getReplicatedLog().getSnapshotTerm()); } - LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + - "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " + + "and term: {}", context.getId(), context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm()); if (context.getId().equals(currentBehavior.getLeaderId()) && captureSnapshot.isInstallSnapshotInitiated()) { // this would be call straight to the leader and won't initiate in serialization - currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot( - ByteString.copyFrom(snapshotBytes))); + currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot)); } captureSnapshot = null;