From f7bec652039ce01319095c51741bf870c383ee09 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Tue, 3 Mar 2015 10:11:41 -0800 Subject: [PATCH] Use SnapshotManager Change-Id: I0d5d5aee12afb2b1f89fb7e4113c4fa5cc334dd3 Signed-off-by: Moiz Raja --- .../controller/cluster/raft/RaftActor.java | 171 ++++-------------- .../cluster/raft/RaftActorContext.java | 4 +- .../cluster/raft/RaftActorContextImpl.java | 21 ++- .../cluster/raft/SnapshotManager.java | 91 +++++++--- .../cluster/raft/SnapshotState.java | 15 +- .../raft/behaviors/AbstractLeader.java | 40 +--- .../behaviors/AbstractRaftActorBehavior.java | 27 +-- .../cluster/raft/behaviors/Follower.java | 2 +- .../cluster/raft/MockRaftActorContext.java | 13 +- .../cluster/raft/RaftActorTest.java | 48 +++-- .../cluster/raft/SnapshotManagerTest.java | 93 ++++++---- .../cluster/raft/behaviors/LeaderTest.java | 2 + .../cluster/datastore/ShardTest.java | 62 ++++--- 13 files changed, 269 insertions(+), 320 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index aa72485187..19804111ec 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -22,7 +22,6 @@ import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; import java.io.Serializable; import java.util.Collection; import java.util.List; @@ -40,7 +39,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -104,6 +102,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { public void apply(ApplyJournalEntries param) throws Exception { } }; + private static final String COMMIT_SNAPSHOT = "commit_snapshot"; protected final Logger LOG = LoggerFactory.getLogger(getClass()); @@ -119,13 +118,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ private final RaftActorContextImpl context; + private final Procedure createSnapshotProcedure = new CreateSnapshotProcedure(); + /** * The in-memory journal */ private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); - private CaptureSnapshot captureSnapshot = null; - private Stopwatch recoveryTimer; private int currentRecoveryBatchCount; @@ -379,26 +378,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", persistenceId(), saveSnapshotFailure.cause()); - context.getReplicatedLog().snapshotRollback(); - - LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + - "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().size()); + context.getSnapshotManager().rollback(); } else if (message instanceof CaptureSnapshot) { LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message); - if(captureSnapshot == null) { - captureSnapshot = (CaptureSnapshot)message; - createSnapshot(); - } + context.getSnapshotManager().create(createSnapshotProcedure); - } else if (message instanceof CaptureSnapshotReply){ + } else if (message instanceof CaptureSnapshotReply) { handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else if(message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); + } else if (message.equals(COMMIT_SNAPSHOT)) { + commitSnapshot(-1); } else { reusableBehaviorStateHolder.init(currentBehavior); @@ -416,7 +408,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog.dataSize()) .inMemoryJournalLogSize(replicatedLog.size()) - .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated()) + .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing()) .lastApplied(context.getLastApplied()) .lastIndex(replicatedLog.lastIndex()) .lastTerm(replicatedLog.lastTerm()) @@ -515,15 +507,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // the state to durable storage self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self()); - // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot - if(!context.isSnapshotCaptureInitiated()){ - raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(), - raftContext.getTermInformation().getCurrentTerm()); - raftContext.getReplicatedLog().snapshotCommit(); - } else { - LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress", - persistenceId(), getId()); - } + context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior); + } else if (clientActor != null) { // Send message for replication currentBehavior.handleMessage(getSelf(), @@ -621,10 +606,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void commitSnapshot(long sequenceNumber) { - context.getReplicatedLog().snapshotCommit(); - - // TODO: Not sure if we want to be this aggressive with trimming stuff - trimPersistentData(sequenceNumber); + context.getSnapshotManager().commit(persistence(), sequenceNumber); } /** @@ -747,67 +729,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void handleCaptureSnapshotReply(byte[] snapshotBytes) { LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); - // create a snapshot object from the state provided and save it - // when snapshot is saved async, SaveSnapshotSuccess is raised. - - Snapshot sn = Snapshot.create(snapshotBytes, - context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), - captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), - captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - - persistence().saveSnapshot(sn); - - LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); - - long dataThreshold = getTotalMemory() * - getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; - if (context.getReplicatedLog().dataSize() > dataThreshold) { - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}", - persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold, - captureSnapshot.getLastAppliedIndex()); - } - - // if memory is less, clear the log based on lastApplied. - // this could/should only happen if one of the followers is down - // as normally we keep removing from the log when its replicated to all. - context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); - - // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an - // install snapshot to a follower. - if(captureSnapshot.getReplicatedToAllIndex() >= 0) { - getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); - } - } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ - // clear the log based on replicatedToAllIndex - context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), - captureSnapshot.getReplicatedToAllTerm()); - - getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); - } else { - // The replicatedToAllIndex was not found in the log - // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot. - // In this scenario we may need to save the snapshot to the akka persistence - // snapshot for recovery but we do not need to do the replicated log trimming. - context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(), - replicatedLog.getSnapshotTerm()); - } - - - LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " + - "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(), - replicatedLog.getSnapshotTerm()); - - if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { - // this would be call straight to the leader and won't initiate in serialization - currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot( - ByteString.copyFrom(snapshotBytes))); - } - - captureSnapshot = null; - context.setSnapshotCaptureInitiated(false); + context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory()); } protected long getTotalMemory() { @@ -819,9 +741,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { - private static final int DATA_SIZE_DIVIDER = 5; - private long dataSizeSinceLastSnapshot = 0; + private long dataSizeSinceLastSnapshot = 0L; + public ReplicatedLogImpl(Snapshot snapshot) { super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), @@ -887,9 +809,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long dataSizeForCheck = dataSize; dataSizeSinceLastSnapshot += logEntrySize; - long journalSize = lastIndex() + 1; - if(!hasFollowers()) { + if (!hasFollowers()) { // When we do not have followers we do not maintain an in-memory log // due to this the journalSize will never become anything close to the // snapshot batch count. In fact will mostly be 1. @@ -903,51 +824,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // as if we were maintaining a real snapshot dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; } - + long journalSize = replicatedLogEntry.getIndex() + 1; long dataThreshold = getTotalMemory() * - getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; - - // when a snaphsot is being taken, captureSnapshot != null - if (!context.isSnapshotCaptureInitiated() && - ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 || - dataSizeForCheck > dataThreshold)) { - - dataSizeSinceLastSnapshot = 0; + context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," + - " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold); + if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 + || dataSizeForCheck > dataThreshold)) { - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; + boolean started = context.getSnapshotManager().capture(replicatedLogEntry, + currentBehavior.getReplicatedToAllIndex()); - ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied()); - if (!hasFollowers()) { - lastAppliedIndex = replicatedLogEntry.getIndex(); - lastAppliedTerm = replicatedLogEntry.getTerm(); - } else if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); + if(started){ + dataSizeSinceLastSnapshot = 0; } - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size()); - LOG.debug("{}: Snapshot Capture lastApplied:{} ", - persistenceId(), context.getLastApplied()); - LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(), - lastAppliedIndex); - LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(), - lastAppliedTerm); - } - - // send a CaptureSnapshot to self to make the expensive operation async. - long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex(); - ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); - getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, - (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), - (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)), - null); - context.setSnapshotCaptureInitiated(true); } + if (callback != null){ callback.apply(replicatedLogEntry); } @@ -1051,7 +943,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void saveSnapshot(Object o) { // Make saving Snapshot successful - commitSnapshot(-1L); + // Committing the snapshot here would end up calling commit in the creating state which would + // be a state violation. That's why now we send a message to commit the snapshot. + self().tell(COMMIT_SNAPSHOT, self()); + } + } + + + private class CreateSnapshotProcedure implements Procedure { + + @Override + public void apply(Void aVoid) throws Exception { + createSnapshot(); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 9d391a1588..2e7eb5eb3a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -166,8 +166,6 @@ public interface RaftActorContext { */ ConfigParams getConfigParams(); - void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated); - - boolean isSnapshotCaptureInitiated(); + SnapshotManager getSnapshotManager(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 6fc5e4369b..eb059d60fb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -41,6 +41,10 @@ public class RaftActorContextImpl implements RaftActorContext { private boolean snapshotCaptureInitiated; + // Snapshot manager will need to be created on demand as it needs raft actor context which cannot + // be passed to it in the constructor + private SnapshotManager snapshotManager; + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, ElectionTerm termInformation, long commitIndex, @@ -134,16 +138,6 @@ public class RaftActorContextImpl implements RaftActorContext { return configParams; } - @Override - public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { - this.snapshotCaptureInitiated = snapshotCaptureInitiated; - } - - @Override - public boolean isSnapshotCaptureInitiated() { - return snapshotCaptureInitiated; - } - @Override public void addToPeers(String name, String address) { peerAddresses.put(name, address); } @@ -166,4 +160,11 @@ public class RaftActorContextImpl implements RaftActorContext { peerAddresses.put(peerId, peerAddress); } + + public SnapshotManager getSnapshotManager() { + if(snapshotManager == null){ + snapshotManager = new SnapshotManager(this, LOG); + } + return snapshotManager; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index cb38e82ac3..432d678491 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -47,13 +47,13 @@ public class SnapshotManager implements SnapshotState { } @Override - public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { - currentState.captureToInstall(lastLogEntry, replicatedToAllIndex); + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower); } @Override - public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { - currentState.capture(lastLogEntry, replicatedToAllIndex); + public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return currentState.capture(lastLogEntry, replicatedToAllIndex); } @Override @@ -62,8 +62,9 @@ public class SnapshotManager implements SnapshotState { } @Override - public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) { - currentState.persist(persistenceProvider, snapshotBytes, currentBehavior); + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, + RaftActorBehavior currentBehavior, long totalMemory) { + currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory); } @Override @@ -77,8 +78,8 @@ public class SnapshotManager implements SnapshotState { } @Override - public long trimLog(long desiredTrimIndex) { - return currentState.trimLog(desiredTrimIndex); + public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { + return currentState.trimLog(desiredTrimIndex, currentBehavior); } private boolean hasFollowers(){ @@ -97,13 +98,15 @@ public class SnapshotManager implements SnapshotState { } @Override - public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { LOG.debug("capture should not be called in state {}", this); + return false; } @Override - public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { LOG.debug("captureToInstall should not be called in state {}", this); + return false; } @Override @@ -112,7 +115,8 @@ public class SnapshotManager implements SnapshotState { } @Override - public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) { + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, + RaftActorBehavior currentBehavior, long totalMemory) { LOG.debug("persist should not be called in state {}", this); } @@ -127,17 +131,22 @@ public class SnapshotManager implements SnapshotState { } @Override - public long trimLog(long desiredTrimIndex) { + public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { LOG.debug("trimLog should not be called in state {}", this); return -1; } - protected long doTrimLog(long desiredTrimIndex){ + protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){ // we would want to keep the lastApplied as its used while capturing snapshots long lastApplied = context.getLastApplied(); long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); - if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { + if(LOG.isTraceEnabled()) { + LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}", + persistenceId(), desiredTrimIndex, lastApplied, tempMin); + } + + if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin, context.getTermInformation().getCurrentTerm()); @@ -146,15 +155,21 @@ public class SnapshotManager implements SnapshotState { context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); context.getReplicatedLog().snapshotCommit(); return tempMin; + } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) { + // It's possible a follower was lagging and an install snapshot advanced its match index past + // the current replicatedToAllIndex. Since the follower is now caught up we should advance the + // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely + // due to a previous snapshot triggered by the memory threshold exceeded, in that case we + // trim the log to the last applied index even if previous entries weren't replicated to all followers. + currentBehavior.setReplicatedToAllIndex(tempMin); } - return -1; } } private class Idle extends AbstractSnapshotState { - private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) { + private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { TermInformationReader lastAppliedTermInfoReader = lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), lastLogEntry, hasFollowers()); @@ -171,24 +186,30 @@ public class SnapshotManager implements SnapshotState { // send a CaptureSnapshot to self to make the expensive operation async. captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(), lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm, - newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall); + newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null); SnapshotManager.this.currentState = CAPTURING; - LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "", - captureSnapshot); + if(targetFollower != null){ + LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot); + } else { + LOG.info("{}: Initiating snapshot capture {} to install on {}", + persistenceId(), captureSnapshot, targetFollower); + } context.getActor().tell(captureSnapshot, context.getActor()); + + return true; } @Override - public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { - capture(lastLogEntry, replicatedToAllIndex, false); + public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return capture(lastLogEntry, replicatedToAllIndex, null); } @Override - public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { - capture(lastLogEntry, replicatedToAllIndex, true); + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + return capture(lastLogEntry, replicatedToAllIndex, targetFollower); } @Override @@ -197,8 +218,8 @@ public class SnapshotManager implements SnapshotState { } @Override - public long trimLog(long desiredTrimIndex) { - return doTrimLog(desiredTrimIndex); + public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { + return doTrimLog(desiredTrimIndex, currentBehavior); } } @@ -235,7 +256,7 @@ public class SnapshotManager implements SnapshotState { @Override public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, - RaftActorBehavior currentBehavior) { + RaftActorBehavior currentBehavior, long totalMemory) { // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. @@ -248,16 +269,28 @@ public class SnapshotManager implements SnapshotState { LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); - long dataThreshold = Runtime.getRuntime().totalMemory() * + long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; if (context.getReplicatedLog().dataSize() > dataThreshold) { + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}", + persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold, + captureSnapshot.getLastAppliedIndex()); + } + // if memory is less, clear the log based on lastApplied. // this could/should only happen if one of the followers is down // as normally we keep removing from the log when its replicated to all. context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an + // install snapshot to a follower. + if(captureSnapshot.getReplicatedToAllIndex() >= 0) { + currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } + } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ // clear the log based on replicatedToAllIndex context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), @@ -350,7 +383,7 @@ public class SnapshotManager implements SnapshotState { } else if (entry != null) { index = entry.getIndex(); term = entry.getTerm(); - } else if(originalIndex == log.getSnapshotIndex()){ + } else if(log.getSnapshotIndex() > -1){ index = log.getSnapshotIndex(); term = log.getSnapshotTerm(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java index 2ff30ec53b..9a9bf1c774 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -24,16 +24,21 @@ public interface SnapshotState { * * @param lastLogEntry the last entry in the replicated log * @param replicatedToAllIndex the current replicatedToAllIndex + * + * @return true if capture was started */ - void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex); + boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex); /** * Initiate capture snapshot for the purposing of installing that snapshot * * @param lastLogEntry * @param replicatedToAllIndex + * @param targetFollower + * + * @return true if capture was started */ - void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex); + boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower); /** * Create the snapshot @@ -48,8 +53,10 @@ public interface SnapshotState { * @param persistenceProvider * @param snapshotBytes * @param currentBehavior + * @param totalMemory */ - void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior); + void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior + ,long totalMemory); /** * Commit the snapshot by trimming the log @@ -70,5 +77,5 @@ public interface SnapshotState { * @param desiredTrimIndex * @return the actual trim index */ - long trimLog(long desiredTrimIndex); + long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index a63c62fa30..2c433f9007 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -235,7 +234,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } @@ -388,7 +387,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } - if (wasLastChunk && !context.isSnapshotCaptureInitiated()) { + if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { // Since the follower is now caught up try to purge the log. purgeInMemoryLog(); } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { @@ -491,7 +490,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendAppendEntries = true; } } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { + leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent @@ -562,37 +561,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final ActorSelection followerActor = context.getPeerActorSelection(followerId); sendSnapshotChunk(followerActor, followerId); - } else if (!context.isSnapshotCaptureInitiated()) { - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); - } - - boolean isInstallSnapshotInitiated = true; - long replicatedToAllIndex = super.getReplicatedToAllIndex(); - ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); - - CaptureSnapshot captureSnapshot = new CaptureSnapshot( - lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, - (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), - (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), - isInstallSnapshotInitiated); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId, - captureSnapshot); - } - - actor().tell(captureSnapshot, actor()); - context.setSnapshotCaptureInitiated(true); + } else { + context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index a1bcf8541c..2a3653ee91 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -460,31 +460,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param snapshotCapturedIndex */ protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { - // we would want to keep the lastApplied as its used while capturing snapshots - long lastApplied = context.getLastApplied(); - long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); + long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this); - if(LOG.isTraceEnabled()) { - LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}", - logName, snapshotCapturedIndex, lastApplied, tempMin); - } - - if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { - LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin, - context.getTermInformation().getCurrentTerm()); - - //use the term of the temp-min, since we check for isPresent, entry will not be null - ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); - context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); - context.getReplicatedLog().snapshotCommit(); - setReplicatedToAllIndex(tempMin); - } else if(tempMin > getReplicatedToAllIndex()) { - // It's possible a follower was lagging and an install snapshot advanced its match index past - // the current replicatedToAllIndex. Since the follower is now caught up we should advance the - // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely - // due to a previous snapshot triggered by the memory threshold exceeded, in that case we - // trim the log to the last applied index even if previous entries weren't replicated to all followers. - setReplicatedToAllIndex(tempMin); + if(actualIndex != -1){ + setReplicatedToAllIndex(actualIndex); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index bdd459ecff..6a29a348b8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -255,7 +255,7 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(reply, actor()); - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 1cc7b5f576..53cca23741 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -35,6 +35,7 @@ public class MockRaftActorContext implements RaftActorContext { private Map peerAddresses = new HashMap<>(); private ConfigParams configParams; private boolean snapshotCaptureInitiated; + private SnapshotManager snapshotManager; public MockRaftActorContext(){ electionTerm = new ElectionTerm() { @@ -191,13 +192,11 @@ public class MockRaftActorContext implements RaftActorContext { } @Override - public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { - this.snapshotCaptureInitiated = snapshotCaptureInitiated; - } - - @Override - public boolean isSnapshotCaptureInitiated() { - return snapshotCaptureInitiated; + public SnapshotManager getSnapshotManager() { + if(this.snapshotManager == null){ + this.snapshotManager = new SnapshotManager(this, getLogger()); + } + return this.snapshotManager; } public void setConfigParams(ConfigParams configParams) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 34932c7249..b93b73958b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -60,7 +60,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntrie import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -738,10 +737,12 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1)); - RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, -1, + new MockRaftActorContext.MockPayload("D")), -1); + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -770,12 +771,13 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); mockRaftActor.waitForInitializeBehaviorComplete(); + MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(lastEntry); ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), @@ -787,7 +789,8 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); long replicatedToAllIndex = 1; - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1)); + + mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); verify(mockRaftActor.delegate).createSnapshot(); @@ -929,7 +932,9 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1)); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("D")), 1); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -1059,9 +1064,10 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1)); + leaderActor.getRaftActorContext().getSnapshotManager() + .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("x")), 4); - leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(leaderActor.delegate).createSnapshot(); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -1087,8 +1093,14 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-2"), new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider() + , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); + + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -1151,9 +1163,10 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1)); + followerActor.getRaftActorContext().getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 5, + new MockRaftActorContext.MockPayload("D")), 4); - followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(followerActor.delegate).createSnapshot(); assertEquals(6, followerActor.getReplicatedLog().size()); @@ -1188,7 +1201,10 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -1286,7 +1302,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); @@ -1370,7 +1386,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(-1, leader.getReplicatedToAllIndex()); }}; @@ -1413,7 +1429,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(3, leader.getReplicatedToAllIndex()); }}; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 90272fec98..3d75edb5bd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -1,5 +1,6 @@ package org.opendaylight.controller.cluster.raft; +import static junit.framework.TestCase.assertFalse; import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; @@ -7,6 +8,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import akka.actor.ActorRef; @@ -21,7 +23,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -90,7 +91,7 @@ public class SnapshotManagerTest extends AbstractActorTest { // Force capturing toInstall = true snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, - new MockRaftActorContext.MockPayload()), 0); + new MockRaftActorContext.MockPayload()), 0, "follower-1"); assertEquals(true, snapshotManager.isCapturing()); @@ -112,9 +113,11 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCapture(){ - snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, new MockRaftActorContext.MockPayload()), 9); + assertTrue(capture); + assertEquals(true, snapshotManager.isCapturing()); CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class); @@ -136,17 +139,21 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testIllegalCapture() throws Exception { - snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, new MockRaftActorContext.MockPayload()), 9); + assertTrue(capture); + List allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); assertEquals(1, allMatching.size()); // This will not cause snapshot capture to start again - snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, new MockRaftActorContext.MockPayload()), 9); + assertFalse(capture); + allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); assertEquals(1, allMatching.size()); @@ -165,7 +172,8 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.create(mockProcedure); byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; - snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); @@ -225,9 +233,10 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockProcedure, times(1)).apply(null); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); - Mockito.reset(mockProcedure); + reset(mockProcedure); snapshotManager.create(mockProcedure); @@ -249,7 +258,8 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); @@ -269,13 +279,12 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); - - verify(mockRaftActorBehavior).setReplicatedToAllIndex(-1); } @Test @@ -283,14 +292,17 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1); + boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + assertTrue(capture); snapshotManager.create(mockProcedure); byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; - snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); @@ -308,7 +320,8 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCallingPersistWithoutCaptureWillDoNothing(){ - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class)); @@ -322,13 +335,15 @@ public class SnapshotManagerTest extends AbstractActorTest { // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1); + new MockRaftActorContext.MockPayload()), -1, "follower-1"); snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); @@ -341,11 +356,12 @@ public class SnapshotManagerTest extends AbstractActorTest { public void testCommit(){ // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1); + new MockRaftActorContext.MockPayload()), -1, "follower-1"); snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); snapshotManager.commit(mockDataPersistenceProvider, 100L); @@ -366,7 +382,7 @@ public class SnapshotManagerTest extends AbstractActorTest { public void testCommitBeforePersist(){ // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1); + new MockRaftActorContext.MockPayload()), -1, "follower-1"); snapshotManager.commit(mockDataPersistenceProvider, 100L); @@ -394,11 +410,12 @@ public class SnapshotManagerTest extends AbstractActorTest { public void testCallingCommitMultipleTimesCausesNoHarm(){ // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1); + new MockRaftActorContext.MockPayload()), -1, "follower-1"); snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); snapshotManager.commit(mockDataPersistenceProvider, 100L); @@ -415,11 +432,12 @@ public class SnapshotManagerTest extends AbstractActorTest { public void testRollback(){ // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1); + new MockRaftActorContext.MockPayload()), -1, "follower-1"); snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); snapshotManager.rollback(); @@ -431,7 +449,7 @@ public class SnapshotManagerTest extends AbstractActorTest { public void testRollbackBeforePersist(){ // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1); + new MockRaftActorContext.MockPayload()), -1, "follower-1"); snapshotManager.rollback(); @@ -449,11 +467,12 @@ public class SnapshotManagerTest extends AbstractActorTest { public void testCallingRollbackMultipleTimesCausesNoHarm(){ // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1); + new MockRaftActorContext.MockPayload()), -1, "follower-1"); snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); snapshotManager.rollback(); @@ -473,7 +492,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - snapshotManager.trimLog(10); + snapshotManager.trimLog(10, mockRaftActorBehavior); verify(mockReplicatedLog).snapshotPreCommit(10, 5); verify(mockReplicatedLog).snapshotCommit(); @@ -481,9 +500,11 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testTrimLogAfterCapture(){ - snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, new MockRaftActorContext.MockPayload()), 9); + assertTrue(capture); + assertEquals(true, snapshotManager.isCapturing()); ElectionTerm mockElectionTerm = mock(ElectionTerm.class); @@ -495,7 +516,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - snapshotManager.trimLog(10); + snapshotManager.trimLog(10, mockRaftActorBehavior); verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); verify(mockReplicatedLog, never()).snapshotCommit(); @@ -504,8 +525,10 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testTrimLogAfterCaptureToInstall(){ - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9, - new MockRaftActorContext.MockPayload()), 9); + boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9, "follower-1"); + + assertTrue(capture); assertEquals(true, snapshotManager.isCapturing()); @@ -518,7 +541,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - snapshotManager.trimLog(10); + snapshotManager.trimLog(10, mockRaftActorBehavior); verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5); verify(mockReplicatedLog, never()).snapshotCommit(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 383ebefd36..1dda2791c0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -524,6 +524,8 @@ public class LeaderTest extends AbstractLeaderTest { new ReplicatedLogImplEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); + actorContext.getReplicatedLog().append(entry); + //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index adc7f4706c..aef25827e7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -69,13 +70,13 @@ import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListene import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; +import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -99,6 +100,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -1421,31 +1423,44 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.persistent(persistent); + + new ShardTestKit(getSystem()) {{ final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); - Creator creator = new Creator() { - @Override - public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { - DelegatingPersistentDataProvider delegating; + class TestShard extends Shard { - @Override - protected DataPersistenceProvider persistence() { - if(delegating == null) { - delegating = new DelegatingPersistentDataProvider(super.persistence()); - } + protected TestShard(ShardIdentifier name, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + super(name, peerAddresses, datastoreContext, schemaContext); + } - return delegating; - } + DelegatingPersistentDataProvider delegating; - @Override - protected void commitSnapshot(final long sequenceNumber) { - super.commitSnapshot(sequenceNumber); - latch.get().countDown(); - } - }; + protected DataPersistenceProvider persistence() { + if(delegating == null) { + delegating = new DelegatingPersistentDataProvider(super.persistence()); + } + return delegating; + } + + @Override + protected void commitSnapshot(final long sequenceNumber) { + super.commitSnapshot(sequenceNumber); + latch.get().countDown(); + } + + @Override + public RaftActorContext getRaftActorContext() { + return super.getRaftActorContext(); + } + } + + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new TestShard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT); } }; @@ -1458,8 +1473,9 @@ public class ShardTest extends AbstractShardTest { NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); - CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1); - shard.tell(capture, getRef()); + // Trigger creation of a snapshot by ensuring + RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext(); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); @@ -1471,7 +1487,7 @@ public class ShardTest extends AbstractShardTest { latch.set(new CountDownLatch(1)); savedSnapshot.set(null); - shard.tell(capture, getRef()); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); -- 2.36.6