Similar to fake snapshots, real snapshots should also be using the replicatedToAllIndex.
replicatedToAllIndex is part of the RaftActorBehavior.
A performSnapshotWithoutCapture helper method is part of the AbstractRaftActorBehavior.
On CaptureSnapshotReply, we clear the log based on replicatedToAllIndex and the corresponding term.
Many of the existing tests seem to test out this changes and some were added and enhanced.
Bug-2715: Fix in-mem log cleanup for an Inactive follower
In case if one of the follower is down, rely on the memory usage to clear the log.
This is done in CaptureSnapshotReply
Change-Id: Ifb3ca19691f20735e3f84b968a7adf01398c20e0
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
}
protected int adjustedIndex(long logEntryIndex) {
- if(snapshotIndex < 0){
+ if (snapshotIndex < 0) {
return (int) logEntryIndex;
}
return (int) (logEntryIndex - (snapshotIndex + 1));
return journal.size();
}
+ @Override
+ public int dataSize() {
+ return dataSize;
+ }
+
@Override
public boolean isPresent(long logEntryIndex) {
if (logEntryIndex > lastIndex()) {
previousSnapshotIndex = -1;
previousSnapshotTerm = -1;
dataSize = 0;
+ // need to recalc the datasize based on the entries left after precommit.
+ for(ReplicatedLogEntry logEntry : journal) {
+ dataSize += logEntry.size();
+ }
+
}
@Override
/**
* This method is called during recovery to reconstruct the state of the actor.
*
- * @param snapshot A snapshot of the state of the actor
+ * @param snapshotBytes A snapshot of the state of the actor
*/
protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
- //be greedy and remove entries from in-mem journal which are in the snapshot
- // and update snapshotIndex and snapshotTerm without waiting for the success,
+ long dataThreshold = Runtime.getRuntime().totalMemory() *
+ getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+ if (context.getReplicatedLog().dataSize() > dataThreshold) {
+ // if memory is less, clear the log based on lastApplied.
+ // this could/should only happen if one of the followers is down
+ // as normally we keep removing from the log when its replicated to all.
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
- context.getReplicatedLog().snapshotPreCommit(
- captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
+ } else {
+ // clear the log based on replicatedToAllIndex
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+ captureSnapshot.getReplicatedToAllTerm());
+ }
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
"and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
// FIXME: Maybe this should be done after the command is saved
journal.subList(adjustedIndex , journal.size()).clear();
- persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+ persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
- @Override public void apply(DeleteEntries param)
- throws Exception {
+ @Override
+ public void apply(DeleteEntries param)
+ throws Exception {
//FIXME : Doing nothing for now
dataSize = 0;
- for(ReplicatedLogEntry entry : journal){
+ for (ReplicatedLogEntry entry : journal) {
dataSize += entry.size();
}
}
appendAndPersist(replicatedLogEntry, null);
}
- @Override
- public int dataSize() {
- return dataSize;
- }
-
public void appendAndPersist(
final ReplicatedLogEntry replicatedLogEntry,
final Procedure<ReplicatedLogEntry> callback) {
long dataSizeForCheck = dataSize;
dataSizeSinceLastSnapshot += logEntrySize;
- long journalSize = lastIndex()+1;
+ long journalSize = lastIndex() + 1;
if(!hasFollowers()) {
// When we do not have followers we do not maintain an in-memory log
}
// send a CaptureSnapshot to self to make the expensive operation async.
- getSelf().tell(new CaptureSnapshot(
- lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+ long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
+ ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+ getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
null);
context.setSnapshotCaptureInitiated(true);
}
- if(callback != null){
+ if (callback != null){
callback.apply(replicatedLogEntry);
}
}
* sets snapshot term
* @param snapshotTerm
*/
- public void setSnapshotTerm(long snapshotTerm);
+ void setSnapshotTerm(long snapshotTerm);
/**
* Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
* @param startIndex
* @param endIndex
*/
- public void clear(int startIndex, int endIndex);
+ void clear(int startIndex, int endIndex);
/**
* Handles all the bookkeeping in order to perform a rollback in the
* @param snapshotCapturedIndex
* @param snapshotCapturedTerm
*/
- public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
+ void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
/**
* Sets the Replicated log to state after snapshot success.
*/
- public void snapshotCommit();
+ void snapshotCommit();
/**
* Restores the replicated log to a state in the event of a save snapshot failure
*/
- public void snapshotRollback();
+ void snapshotRollback();
/**
* Size of the data in the log (in bytes)
*/
- public int dataSize();
+ int dataSize();
+
}
private long lastIndex;
private long lastTerm;
private boolean installSnapshotInitiated;
+ private long replicatedToAllIndex;
+ private long replicatedToAllTerm;
public CaptureSnapshot(long lastIndex, long lastTerm,
- long lastAppliedIndex, long lastAppliedTerm) {
- this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, false);
+ long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
+ this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
}
public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
- long lastAppliedTerm, boolean installSnapshotInitiated) {
+ long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
this.lastIndex = lastIndex;
this.lastTerm = lastTerm;
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
this.installSnapshotInitiated = installSnapshotInitiated;
+ this.replicatedToAllIndex = replicatedToAllIndex;
+ this.replicatedToAllTerm = replicatedToAllTerm;
}
public long getLastAppliedIndex() {
public boolean isInstallSnapshotInitiated() {
return installSnapshotInitiated;
}
+
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
+ }
+
+ public long getReplicatedToAllTerm() {
+ return replicatedToAllTerm;
+ }
}
private Optional<ByteString> snapshot;
- private long replicatedToAllIndex = -1;
-
public AbstractLeader(RaftActorContext context) {
super(context);
}
private void purgeInMemoryLog() {
- //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+ //find the lowest index across followers which has been replicated to all.
+ // lastApplied if there are no followers, so that we keep clearing the log for single-node
// we would delete the in-mem log from that index on, in-order to minimize mem usage
// we would also share this info thru AE with the followers so that they can delete their log entries as well.
- long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
for (FollowerLogInformation info : followerToLog.values()) {
minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
}
- replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+ super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
}
@Override
sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex) {
+ leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
prevLogIndex(followerNextIndex),
prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex(), replicatedToAllIndex);
+ context.getCommitIndex(), super.getReplicatedToAllIndex());
if(!entries.isEmpty()) {
LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
}
/**
- * /**
* Install Snapshot works as follows
* 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
* 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
* @param followerNextIndex
*/
private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet());
- }
-
if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
if (lastAppliedEntry != null) {
lastAppliedIndex = lastAppliedEntry.getIndex();
lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
+ } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
}
boolean isInstallSnapshotInitiated = true;
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
- lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
- actor());
+ long replicatedToAllIndex = super.getReplicatedToAllIndex();
+ ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+ actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
+ isInstallSnapshotInitiated), actor());
context.setSnapshotCaptureInitiated(true);
}
}
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
nextSnapshotChunk,
- followerToSnapshot.incrementChunkIndex(),
- followerToSnapshot.getTotalChunks(),
+ followerToSnapshot.incrementChunkIndex(),
+ followerToSnapshot.getTotalChunks(),
Optional.of(followerToSnapshot.getLastChunkHashCode())
).toSerializable(),
actor()
*/
protected String leaderId = null;
+ private long replicatedToAllIndex = -1;
protected AbstractRaftActorBehavior(RaftActorContext context) {
this.context = context;
this.LOG = context.getLogger();
}
+ @Override
+ public void setReplicatedToAllIndex(long replicatedToAllIndex) {
+ this.replicatedToAllIndex = replicatedToAllIndex;
+ }
+
+ @Override
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
+ }
+
/**
* Derived classes should not directly handle AppendEntries messages it
* should let the base class handle it first. Once the base class handles
}
- protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
+ /**
+ * Performs a snapshot with no capture on the replicated log.
+ * It clears the log from the supplied index or last-applied-1 which ever is minimum.
+ *
+ * @param snapshotCapturedIndex
+ */
+ protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
// we would want to keep the lastApplied as its used while capturing snapshots
- long tempMin = Math.min(minReplicatedToAllIndex,
- (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+ long lastApplied = context.getLastApplied();
+ long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+ //use the term of the temp-min, since we check for isPresent, entry will not be null
+ ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+ context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
context.getReplicatedLog().snapshotCommit();
- return tempMin;
+ setReplicatedToAllIndex(tempMin);
}
- return currentReplicatedIndex;
}
+
}
lastIndex(), lastTerm()), actor());
if (!context.isSnapshotCaptureInitiated()) {
- fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+ super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
return this;
* @return
*/
String getLeaderId();
+
+ /**
+ * setting the index of the log entry which is replicated to all nodes
+ * @param replicatedToAllIndex
+ */
+ void setReplicatedToAllIndex(long replicatedToAllIndex);
+
+ /**
+ * getting the index of the log entry which is replicated to all nodes
+ * @return
+ */
+ long getReplicatedToAllIndex();
}
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+
/**
*
*/
@Test
public void testSnapshotPreCommit() {
+ //add 4 more entries
replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
+ //sending negative values should not cause any changes
+ replicatedLogImpl.snapshotPreCommit(-1, -1);
+ assertEquals(8, replicatedLogImpl.size());
+ assertEquals(-1, replicatedLogImpl.getSnapshotIndex());
+
replicatedLogImpl.snapshotPreCommit(4, 3);
assertEquals(3, replicatedLogImpl.size());
assertEquals(4, replicatedLogImpl.getSnapshotIndex());
assertEquals(0, replicatedLogImpl.size());
assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+ }
+
+ @Test
+ public void testIsPresent() {
+ assertTrue(replicatedLogImpl.isPresent(0));
+ assertTrue(replicatedLogImpl.isPresent(1));
+ assertTrue(replicatedLogImpl.isPresent(2));
+ assertTrue(replicatedLogImpl.isPresent(3));
+
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("D")));
+ replicatedLogImpl.snapshotPreCommit(3, 2); //snapshot on 3
+ replicatedLogImpl.snapshotCommit();
+
+ assertFalse(replicatedLogImpl.isPresent(0));
+ assertFalse(replicatedLogImpl.isPresent(1));
+ assertFalse(replicatedLogImpl.isPresent(2));
+ assertFalse(replicatedLogImpl.isPresent(3));
+ assertTrue(replicatedLogImpl.isPresent(4));
+
+ replicatedLogImpl.snapshotPreCommit(4, 2); //snapshot on 4
+ replicatedLogImpl.snapshotCommit();
+ assertFalse(replicatedLogImpl.isPresent(4));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("D")));
+ assertTrue(replicatedLogImpl.isPresent(5));
}
// create a snapshot for test
new MockRaftActorContext.MockPayload("C"),
new MockRaftActorContext.MockPayload("D")));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+ mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
+ long replicatedToAllIndex = 1;
+ mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
verify(mockRaftActor.delegate).createSnapshot();
verify(dataPersistenceProvider).deleteMessages(100);
- assertEquals(2, mockRaftActor.getReplicatedLog().size());
+ assertEquals(3, mockRaftActor.getReplicatedLog().size());
+ assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
+ assertNotNull(mockRaftActor.getReplicatedLog().get(2));
assertNotNull(mockRaftActor.getReplicatedLog().get(3));
assertNotNull(mockRaftActor.getReplicatedLog().get(4));
// Index 2 will not be in the log because it was removed due to snapshotting
- assertNull(mockRaftActor.getReplicatedLog().get(2));
+ assertNull(mockRaftActor.getReplicatedLog().get(1));
+ assertNull(mockRaftActor.getReplicatedLog().get(0));
}
};
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+ mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
assertEquals(8, leaderActor.getReplicatedLog().size());
- leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1));
+ leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+
leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(leaderActor.delegate).createSnapshot();
followerActor.waitForInitializeBehaviorComplete();
- // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
Follower follower = new Follower(followerActor.getRaftActorContext());
followerActor.setCurrentBehavior(follower);
assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+ // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
- // log as indices 0-5
+ // log has indices 0-5
assertEquals(6, followerActor.getReplicatedLog().size());
//snapshot on 4
- followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1));
+ followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+
followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(followerActor.delegate).createSnapshot();
followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
- // capture snapshot reply should remove the snapshotted entries only
+ // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
assertEquals(7, followerActor.getReplicatedLog().lastIndex());
// create 5 entries in the log
MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+
//set the snapshot index to 4 , 0 to 4 are snapshotted
leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+ //setting replicatedToAllIndex = 9, for the log to clear
+ leader.setReplicatedToAllIndex(9);
assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// set the 2nd follower nextIndex to 1 which has been snapshotted
leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// simulate a real snapshot
leaderActor.onReceiveCommand(new SendHeartBeat());
leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
- assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
+ assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
//reply from a slow follower after should not raise errors
leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+import java.util.ArrayList;
+import java.util.List;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.AbstractActorTest;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
}
@Test
- public void testFakeSnapshots() {
+ public void testPerformSnapshot() {
MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
- AbstractRaftActorBehavior behavior = new Leader(context);
- context.getTermInformation().update(1, "leader");
+ AbstractRaftActorBehavior abstractBehavior = (AbstractRaftActorBehavior) createBehavior(context);
+ if (abstractBehavior instanceof Candidate) {
+ return;
+ }
- //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+ context.getTermInformation().update(1, "test");
+
+ //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the
context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
context.setLastApplied(0);
- assertEquals(-1, behavior.fakeSnapshot(0, -1));
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
assertEquals(1, context.getReplicatedLog().size());
//2 entries, lastApplied still 0, no purging.
- context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
context.setLastApplied(0);
- assertEquals(-1, behavior.fakeSnapshot(0, -1));
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
assertEquals(2, context.getReplicatedLog().size());
//2 entries, lastApplied still 0, no purging.
- context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
context.setLastApplied(1);
- assertEquals(0, behavior.fakeSnapshot(0, -1));
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(0, abstractBehavior.getReplicatedToAllIndex());
assertEquals(1, context.getReplicatedLog().size());
//5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
- context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build());
context.setLastApplied(2);
- assertEquals(1, behavior.fakeSnapshot(3, 1));
+ abstractBehavior.performSnapshotWithoutCapture(3);
+ assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
assertEquals(3, context.getReplicatedLog().size());
-
+ // scenario where Last applied > Replicated to all index (becoz of a slow follower)
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ context.setLastApplied(2);
+ abstractBehavior.performSnapshotWithoutCapture(1);
+ assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
}
+
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
ActorRef actorRef, RaftRPC rpc) {
NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
- CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1);
+ CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
shard.tell(capture, getRef());
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));