import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
public void apply(ApplyJournalEntries param) throws Exception {
}
};
+ private static final String COMMIT_SNAPSHOT = "commit_snapshot";
protected final Logger LOG = LoggerFactory.getLogger(getClass());
*/
private final RaftActorContextImpl context;
+ private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
+
/**
* The in-memory journal
*/
private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
- private CaptureSnapshot captureSnapshot = null;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
persistenceId(), saveSnapshotFailure.cause());
- context.getReplicatedLog().snapshotRollback();
-
- LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().size());
+ context.getSnapshotManager().rollback();
} else if (message instanceof CaptureSnapshot) {
LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
- if(captureSnapshot == null) {
- captureSnapshot = (CaptureSnapshot)message;
- createSnapshot();
- }
+ context.getSnapshotManager().create(createSnapshotProcedure);
- } else if (message instanceof CaptureSnapshotReply){
+ } else if (message instanceof CaptureSnapshotReply) {
handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
+ } else if (message.equals(COMMIT_SNAPSHOT)) {
+ commitSnapshot(-1);
} else {
reusableBehaviorStateHolder.init(currentBehavior);
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog.dataSize())
.inMemoryJournalLogSize(replicatedLog.size())
- .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+ .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
.lastApplied(context.getLastApplied())
.lastIndex(replicatedLog.lastIndex())
.lastTerm(replicatedLog.lastTerm())
// the state to durable storage
self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
- // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!context.isSnapshotCaptureInitiated()){
- raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
- raftContext.getTermInformation().getCurrentTerm());
- raftContext.getReplicatedLog().snapshotCommit();
- } else {
- LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
- persistenceId(), getId());
- }
+ context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+
} else if (clientActor != null) {
// Send message for replication
currentBehavior.handleMessage(getSelf(),
}
protected void commitSnapshot(long sequenceNumber) {
- context.getReplicatedLog().snapshotCommit();
-
- // TODO: Not sure if we want to be this aggressive with trimming stuff
- trimPersistentData(sequenceNumber);
+ context.getSnapshotManager().commit(persistence(), sequenceNumber);
}
/**
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
- // create a snapshot object from the state provided and save it
- // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
- Snapshot sn = Snapshot.create(snapshotBytes,
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
- captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
- captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
- persistence().saveSnapshot(sn);
-
- LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
- long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
- persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
- captureSnapshot.getLastAppliedIndex());
- }
-
- // if memory is less, clear the log based on lastApplied.
- // this could/should only happen if one of the followers is down
- // as normally we keep removing from the log when its replicated to all.
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
-
- // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
- // install snapshot to a follower.
- if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- }
- } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
- // clear the log based on replicatedToAllIndex
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
- captureSnapshot.getReplicatedToAllTerm());
-
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- } else {
- // The replicatedToAllIndex was not found in the log
- // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
- // In this scenario we may need to save the snapshot to the akka persistence
- // snapshot for recovery but we do not need to do the replicated log trimming.
- context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
- }
-
-
- LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
- "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
-
- if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
- // this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
- ByteString.copyFrom(snapshotBytes)));
- }
-
- captureSnapshot = null;
- context.setSnapshotCaptureInitiated(false);
+ context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
}
protected long getTotalMemory() {
}
private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
private static final int DATA_SIZE_DIVIDER = 5;
- private long dataSizeSinceLastSnapshot = 0;
+ private long dataSizeSinceLastSnapshot = 0L;
+
public ReplicatedLogImpl(Snapshot snapshot) {
super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
long dataSizeForCheck = dataSize;
dataSizeSinceLastSnapshot += logEntrySize;
- long journalSize = lastIndex() + 1;
- if(!hasFollowers()) {
+ if (!hasFollowers()) {
// When we do not have followers we do not maintain an in-memory log
// due to this the journalSize will never become anything close to the
// snapshot batch count. In fact will mostly be 1.
// as if we were maintaining a real snapshot
dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
}
-
+ long journalSize = replicatedLogEntry.getIndex() + 1;
long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
- // when a snaphsot is being taken, captureSnapshot != null
- if (!context.isSnapshotCaptureInitiated() &&
- ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
- dataSizeForCheck > dataThreshold)) {
-
- dataSizeSinceLastSnapshot = 0;
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
- " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+ if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+ || dataSizeForCheck > dataThreshold)) {
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
+ boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+ currentBehavior.getReplicatedToAllIndex());
- ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
- if (!hasFollowers()) {
- lastAppliedIndex = replicatedLogEntry.getIndex();
- lastAppliedTerm = replicatedLogEntry.getTerm();
- } else if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
+ if(started){
+ dataSizeSinceLastSnapshot = 0;
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
- LOG.debug("{}: Snapshot Capture lastApplied:{} ",
- persistenceId(), context.getLastApplied());
- LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
- lastAppliedIndex);
- LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
- lastAppliedTerm);
- }
-
- // send a CaptureSnapshot to self to make the expensive operation async.
- long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
- getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
- null);
- context.setSnapshotCaptureInitiated(true);
}
+
if (callback != null){
callback.apply(replicatedLogEntry);
}
@Override
public void saveSnapshot(Object o) {
// Make saving Snapshot successful
- commitSnapshot(-1L);
+ // Committing the snapshot here would end up calling commit in the creating state which would
+ // be a state violation. That's why now we send a message to commit the snapshot.
+ self().tell(COMMIT_SNAPSHOT, self());
+ }
+ }
+
+
+ private class CreateSnapshotProcedure implements Procedure<Void> {
+
+ @Override
+ public void apply(Void aVoid) throws Exception {
+ createSnapshot();
}
}
*/
ConfigParams getConfigParams();
- void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
-
- boolean isSnapshotCaptureInitiated();
+ SnapshotManager getSnapshotManager();
}
private boolean snapshotCaptureInitiated;
+ // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
+ // be passed to it in the constructor
+ private SnapshotManager snapshotManager;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
return configParams;
}
- @Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
- }
-
@Override public void addToPeers(String name, String address) {
peerAddresses.put(name, address);
}
peerAddresses.put(peerId, peerAddress);
}
+
+ public SnapshotManager getSnapshotManager() {
+ if(snapshotManager == null){
+ snapshotManager = new SnapshotManager(this, LOG);
+ }
+ return snapshotManager;
+ }
}
}
@Override
- public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
- currentState.captureToInstall(lastLogEntry, replicatedToAllIndex);
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
}
@Override
- public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
- currentState.capture(lastLogEntry, replicatedToAllIndex);
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return currentState.capture(lastLogEntry, replicatedToAllIndex);
}
@Override
}
@Override
- public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
- currentState.persist(persistenceProvider, snapshotBytes, currentBehavior);
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
}
@Override
}
@Override
- public long trimLog(long desiredTrimIndex) {
- return currentState.trimLog(desiredTrimIndex);
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return currentState.trimLog(desiredTrimIndex, currentBehavior);
}
private boolean hasFollowers(){
}
@Override
- public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
LOG.debug("capture should not be called in state {}", this);
+ return false;
}
@Override
- public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
LOG.debug("captureToInstall should not be called in state {}", this);
+ return false;
}
@Override
}
@Override
- public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
LOG.debug("persist should not be called in state {}", this);
}
}
@Override
- public long trimLog(long desiredTrimIndex) {
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
LOG.debug("trimLog should not be called in state {}", this);
return -1;
}
- protected long doTrimLog(long desiredTrimIndex){
+ protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
// we would want to keep the lastApplied as its used while capturing snapshots
long lastApplied = context.getLastApplied();
long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
- if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+ persistenceId(), desiredTrimIndex, lastApplied, tempMin);
+ }
+
+ if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
context.getTermInformation().getCurrentTerm());
context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
context.getReplicatedLog().snapshotCommit();
return tempMin;
+ } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+ // It's possible a follower was lagging and an install snapshot advanced its match index past
+ // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
+ // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
+ // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
+ // trim the log to the last applied index even if previous entries weren't replicated to all followers.
+ currentBehavior.setReplicatedToAllIndex(tempMin);
}
-
return -1;
}
}
private class Idle extends AbstractSnapshotState {
- private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) {
+ private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
TermInformationReader lastAppliedTermInfoReader =
lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
lastLogEntry, hasFollowers());
// send a CaptureSnapshot to self to make the expensive operation async.
captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
- newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall);
+ newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
SnapshotManager.this.currentState = CAPTURING;
- LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "",
- captureSnapshot);
+ if(targetFollower != null){
+ LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+ } else {
+ LOG.info("{}: Initiating snapshot capture {} to install on {}",
+ persistenceId(), captureSnapshot, targetFollower);
+ }
context.getActor().tell(captureSnapshot, context.getActor());
+
+ return true;
}
@Override
- public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
- capture(lastLogEntry, replicatedToAllIndex, false);
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return capture(lastLogEntry, replicatedToAllIndex, null);
}
@Override
- public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
- capture(lastLogEntry, replicatedToAllIndex, true);
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
}
@Override
}
@Override
- public long trimLog(long desiredTrimIndex) {
- return doTrimLog(desiredTrimIndex);
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return doTrimLog(desiredTrimIndex, currentBehavior);
}
}
@Override
public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
- RaftActorBehavior currentBehavior) {
+ RaftActorBehavior currentBehavior, long totalMemory) {
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
- long dataThreshold = Runtime.getRuntime().totalMemory() *
+ long dataThreshold = totalMemory *
context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
if (context.getReplicatedLog().dataSize() > dataThreshold) {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
+ persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
+ captureSnapshot.getLastAppliedIndex());
+ }
+
// if memory is less, clear the log based on lastApplied.
// this could/should only happen if one of the followers is down
// as normally we keep removing from the log when its replicated to all.
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
- currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+ // install snapshot to a follower.
+ if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+ currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ }
+
} else if(captureSnapshot.getReplicatedToAllIndex() != -1){
// clear the log based on replicatedToAllIndex
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
} else if (entry != null) {
index = entry.getIndex();
term = entry.getTerm();
- } else if(originalIndex == log.getSnapshotIndex()){
+ } else if(log.getSnapshotIndex() > -1){
index = log.getSnapshotIndex();
term = log.getSnapshotTerm();
}
*
* @param lastLogEntry the last entry in the replicated log
* @param replicatedToAllIndex the current replicatedToAllIndex
+ *
+ * @return true if capture was started
*/
- void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+ boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
/**
* Initiate capture snapshot for the purposing of installing that snapshot
*
* @param lastLogEntry
* @param replicatedToAllIndex
+ * @param targetFollower
+ *
+ * @return true if capture was started
*/
- void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+ boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
/**
* Create the snapshot
* @param persistenceProvider
* @param snapshotBytes
* @param currentBehavior
+ * @param totalMemory
*/
- void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior);
+ void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
+ ,long totalMemory);
/**
* Commit the snapshot by trimming the log
* @param desiredTrimIndex
* @return the actual trim index
*/
- long trimLog(long desiredTrimIndex);
+ long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior);
}
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
applyLogToStateMachine(context.getCommitIndex());
}
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
purgeInMemoryLog();
}
followerToSnapshot.markSendStatus(false);
}
- if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+ if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
// Since the follower is now caught up try to purge the log.
purgeInMemoryLog();
} else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
+ leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
- } else if (!context.isSnapshotCaptureInitiated()) {
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
-
- boolean isInstallSnapshotInitiated = true;
- long replicatedToAllIndex = super.getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-
- CaptureSnapshot captureSnapshot = new CaptureSnapshot(
- lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
- isInstallSnapshotInitiated);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
- captureSnapshot);
- }
-
- actor().tell(captureSnapshot, actor());
- context.setSnapshotCaptureInitiated(true);
+ } else {
+ context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
}
}
}
* @param snapshotCapturedIndex
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
- // we would want to keep the lastApplied as its used while capturing snapshots
- long lastApplied = context.getLastApplied();
- long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+ long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}",
- logName, snapshotCapturedIndex, lastApplied, tempMin);
- }
-
- if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
- context.getTermInformation().getCurrentTerm());
-
- //use the term of the temp-min, since we check for isPresent, entry will not be null
- ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
- context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
- context.getReplicatedLog().snapshotCommit();
- setReplicatedToAllIndex(tempMin);
- } else if(tempMin > getReplicatedToAllIndex()) {
- // It's possible a follower was lagging and an install snapshot advanced its match index past
- // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
- // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
- // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
- // trim the log to the last applied index even if previous entries weren't replicated to all followers.
- setReplicatedToAllIndex(tempMin);
+ if(actualIndex != -1){
+ setReplicatedToAllIndex(actualIndex);
}
}
sender.tell(reply, actor());
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
private Map<String, String> peerAddresses = new HashMap<>();
private ConfigParams configParams;
private boolean snapshotCaptureInitiated;
+ private SnapshotManager snapshotManager;
public MockRaftActorContext(){
electionTerm = new ElectionTerm() {
}
@Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
+ public SnapshotManager getSnapshotManager() {
+ if(this.snapshotManager == null){
+ this.snapshotManager = new SnapshotManager(this, getLogger());
+ }
+ return this.snapshotManager;
}
public void setConfigParams(ConfigParams configParams) {
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
new MockRaftActorContext.MockPayload("C"),
new MockRaftActorContext.MockPayload("D")));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
-
RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+ raftActorContext.getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
+ new MockRaftActorContext.MockPayload("D")), -1);
+
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
mockRaftActor.waitForInitializeBehaviorComplete();
+ MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
- mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
+ mockRaftActor.getReplicatedLog().append(lastEntry);
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
long replicatedToAllIndex = 1;
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
+
+ mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
verify(mockRaftActor.delegate).createSnapshot();
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
+ raftActorContext.getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+ new MockRaftActorContext.MockPayload("D")), 1);
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
assertEquals(8, leaderActor.getReplicatedLog().size());
- leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+ leaderActor.getRaftActorContext().getSnapshotManager()
+ .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("x")), 4);
- leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(leaderActor.delegate).createSnapshot();
assertEquals(8, leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-2"),
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
+ , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
+
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+ // The commit is needed to complete the snapshot creation process
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertEquals(6, followerActor.getReplicatedLog().size());
//snapshot on 4
- followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+ followerActor.getRaftActorContext().getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("D")), 4);
- followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(followerActor.delegate).createSnapshot();
assertEquals(6, followerActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+ // The commit is needed to complete the snapshot creation process
+ followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
// Trimming log in this scenario is a no-op
assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals(-1, leader.getReplicatedToAllIndex());
}};
// Trimming log in this scenario is a no-op
assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals(3, leader.getReplicatedToAllIndex());
}};
package org.opendaylight.controller.cluster.raft;
+import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
// Force capturing toInstall = true
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
- new MockRaftActorContext.MockPayload()), 0);
+ new MockRaftActorContext.MockPayload()), 0, "follower-1");
assertEquals(true, snapshotManager.isCapturing());
@Test
public void testCapture(){
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9);
+ assertTrue(capture);
+
assertEquals(true, snapshotManager.isCapturing());
CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
@Test
public void testIllegalCapture() throws Exception {
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9);
+ assertTrue(capture);
+
List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
assertEquals(1, allMatching.size());
// This will not cause snapshot capture to start again
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9);
+ assertFalse(capture);
+
allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
assertEquals(1, allMatching.size());
snapshotManager.create(mockProcedure);
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
verify(mockProcedure, times(1)).apply(null);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
- Mockito.reset(mockProcedure);
+ reset(mockProcedure);
snapshotManager.create(mockProcedure);
snapshotManager.create(mockProcedure);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
snapshotManager.create(mockProcedure);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
-
- verify(mockRaftActorBehavior).setReplicatedToAllIndex(-1);
}
@Test
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1);
+ boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ assertTrue(capture);
snapshotManager.create(mockProcedure);
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
@Test
public void testCallingPersistWithoutCaptureWillDoNothing(){
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1);
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
snapshotManager.create(mockProcedure);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
public void testCommit(){
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1);
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
snapshotManager.create(mockProcedure);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
snapshotManager.commit(mockDataPersistenceProvider, 100L);
public void testCommitBeforePersist(){
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1);
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
snapshotManager.commit(mockDataPersistenceProvider, 100L);
public void testCallingCommitMultipleTimesCausesNoHarm(){
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1);
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
snapshotManager.create(mockProcedure);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
snapshotManager.commit(mockDataPersistenceProvider, 100L);
public void testRollback(){
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1);
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
snapshotManager.create(mockProcedure);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
public void testRollbackBeforePersist(){
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1);
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
snapshotManager.rollback();
public void testCallingRollbackMultipleTimesCausesNoHarm(){
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1);
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
snapshotManager.create(mockProcedure);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10);
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
verify(mockReplicatedLog).snapshotPreCommit(10, 5);
verify(mockReplicatedLog).snapshotCommit();
@Test
public void testTrimLogAfterCapture(){
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9);
+ assertTrue(capture);
+
assertEquals(true, snapshotManager.isCapturing());
ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10);
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
verify(mockReplicatedLog, never()).snapshotCommit();
@Test
public void testTrimLogAfterCaptureToInstall(){
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
- new MockRaftActorContext.MockPayload()), 9);
+ boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9, "follower-1");
+
+ assertTrue(capture);
assertEquals(true, snapshotManager.isCapturing());
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10);
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
verify(mockReplicatedLog, never()).snapshotCommit();
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
+ actorContext.getReplicatedLog().append(entry);
+
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
import org.junit.Test;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
dataStoreContextBuilder.persistent(persistent);
+
+
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
- Creator<Shard> creator = new Creator<Shard>() {
- @Override
- public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
- DelegatingPersistentDataProvider delegating;
+ class TestShard extends Shard {
- @Override
- protected DataPersistenceProvider persistence() {
- if(delegating == null) {
- delegating = new DelegatingPersistentDataProvider(super.persistence());
- }
+ protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ super(name, peerAddresses, datastoreContext, schemaContext);
+ }
- return delegating;
- }
+ DelegatingPersistentDataProvider delegating;
- @Override
- protected void commitSnapshot(final long sequenceNumber) {
- super.commitSnapshot(sequenceNumber);
- latch.get().countDown();
- }
- };
+ protected DataPersistenceProvider persistence() {
+ if(delegating == null) {
+ delegating = new DelegatingPersistentDataProvider(super.persistence());
+ }
+ return delegating;
+ }
+
+ @Override
+ protected void commitSnapshot(final long sequenceNumber) {
+ super.commitSnapshot(sequenceNumber);
+ latch.get().countDown();
+ }
+
+ @Override
+ public RaftActorContext getRaftActorContext() {
+ return super.getRaftActorContext();
+ }
+ }
+
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new TestShard(shardID, Collections.<String,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT);
}
};
NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
- CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
- shard.tell(capture, getRef());
+ // Trigger creation of a snapshot by ensuring
+ RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
+ raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
latch.set(new CountDownLatch(1));
savedSnapshot.set(null);
- shard.tell(capture, getRef());
+ raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));