From 6bfcbfc5158130c4255b861cb02dfaa68c52aa63 Mon Sep 17 00:00:00 2001 From: Kamal Rameshan Date: Tue, 10 Feb 2015 23:24:55 -0800 Subject: [PATCH] Bug-2692:Real snapshots should use the replicatedToAllIndex for clearing in-mem log Similar to fake snapshots, real snapshots should also be using the replicatedToAllIndex. replicatedToAllIndex is part of the RaftActorBehavior. A performSnapshotWithoutCapture helper method is part of the AbstractRaftActorBehavior. On CaptureSnapshotReply, we clear the log based on replicatedToAllIndex and the corresponding term. Many of the existing tests seem to test out this changes and some were added and enhanced. Bug-2715: Fix in-mem log cleanup for an Inactive follower In case if one of the follower is down, rely on the memory usage to clear the log. This is done in CaptureSnapshotReply Change-Id: Ifb3ca19691f20735e3f84b968a7adf01398c20e0 Signed-off-by: Kamal Rameshan --- .../raft/AbstractReplicatedLogImpl.java | 12 ++++- .../controller/cluster/raft/RaftActor.java | 46 ++++++++++-------- .../cluster/raft/ReplicatedLog.java | 13 ++--- .../raft/base/messages/CaptureSnapshot.java | 18 +++++-- .../raft/behaviors/AbstractLeader.java | 33 ++++++------- .../behaviors/AbstractRaftActorBehavior.java | 31 +++++++++--- .../cluster/raft/behaviors/Follower.java | 2 +- .../raft/behaviors/RaftActorBehavior.java | 12 +++++ .../raft/AbstractReplicatedLogImplTest.java | 31 ++++++++++++ .../cluster/raft/RaftActorTest.java | 35 +++++++++----- .../AbstractRaftActorBehaviorTest.java | 48 ++++++++++++------- .../cluster/datastore/ShardTest.java | 2 +- 12 files changed, 199 insertions(+), 84 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index d1c3fefee8..e2aa16918e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -41,7 +41,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } protected int adjustedIndex(long logEntryIndex) { - if(snapshotIndex < 0){ + if (snapshotIndex < 0) { return (int) logEntryIndex; } return (int) (logEntryIndex - (snapshotIndex + 1)); @@ -134,6 +134,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { return journal.size(); } + @Override + public int dataSize() { + return dataSize; + } + @Override public boolean isPresent(long logEntryIndex) { if (logEntryIndex > lastIndex()) { @@ -200,6 +205,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { previousSnapshotIndex = -1; previousSnapshotTerm = -1; dataSize = 0; + // need to recalc the datasize based on the entries left after precommit. + for(ReplicatedLogEntry logEntry : journal) { + dataSize += logEntry.size(); + } + } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1f446c72f1..353d0b4a24 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -573,7 +573,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * This method is called during recovery to reconstruct the state of the actor. * - * @param snapshot A snapshot of the state of the actor + * @param snapshotBytes A snapshot of the state of the actor */ protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); @@ -668,12 +668,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); - //be greedy and remove entries from in-mem journal which are in the snapshot - // and update snapshotIndex and snapshotTerm without waiting for the success, + long dataThreshold = Runtime.getRuntime().totalMemory() * + getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; + if (context.getReplicatedLog().dataSize() > dataThreshold) { + // if memory is less, clear the log based on lastApplied. + // this could/should only happen if one of the followers is down + // as normally we keep removing from the log when its replicated to all. + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); - context.getReplicatedLog().snapshotPreCommit( - captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); + } else { + // clear the log based on replicatedToAllIndex + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), + captureSnapshot.getReplicatedToAllTerm()); + } + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), @@ -717,13 +726,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // FIXME: Maybe this should be done after the command is saved journal.subList(adjustedIndex , journal.size()).clear(); - persistence().persist(new DeleteEntries(adjustedIndex), new Procedure(){ + persistence().persist(new DeleteEntries(adjustedIndex), new Procedure() { - @Override public void apply(DeleteEntries param) - throws Exception { + @Override + public void apply(DeleteEntries param) + throws Exception { //FIXME : Doing nothing for now dataSize = 0; - for(ReplicatedLogEntry entry : journal){ + for (ReplicatedLogEntry entry : journal) { dataSize += entry.size(); } } @@ -735,11 +745,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { appendAndPersist(replicatedLogEntry, null); } - @Override - public int dataSize() { - return dataSize; - } - public void appendAndPersist( final ReplicatedLogEntry replicatedLogEntry, final Procedure callback) { @@ -766,7 +771,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long dataSizeForCheck = dataSize; dataSizeSinceLastSnapshot += logEntrySize; - long journalSize = lastIndex()+1; + long journalSize = lastIndex() + 1; if(!hasFollowers()) { // When we do not have followers we do not maintain an in-memory log @@ -817,12 +822,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } // send a CaptureSnapshot to self to make the expensive operation async. - getSelf().tell(new CaptureSnapshot( - lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), + long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)), null); context.setSnapshotCaptureInitiated(true); } - if(callback != null){ + if (callback != null){ callback.apply(replicatedLogEntry); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 80b7ad90d0..82d0839bee 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -145,14 +145,14 @@ public interface ReplicatedLog { * sets snapshot term * @param snapshotTerm */ - public void setSnapshotTerm(long snapshotTerm); + void setSnapshotTerm(long snapshotTerm); /** * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive) * @param startIndex * @param endIndex */ - public void clear(int startIndex, int endIndex); + void clear(int startIndex, int endIndex); /** * Handles all the bookkeeping in order to perform a rollback in the @@ -160,20 +160,21 @@ public interface ReplicatedLog { * @param snapshotCapturedIndex * @param snapshotCapturedTerm */ - public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm); + void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm); /** * Sets the Replicated log to state after snapshot success. */ - public void snapshotCommit(); + void snapshotCommit(); /** * Restores the replicated log to a state in the event of a save snapshot failure */ - public void snapshotRollback(); + void snapshotRollback(); /** * Size of the data in the log (in bytes) */ - public int dataSize(); + int dataSize(); + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java index d4dd3350f3..a96b1e435c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -14,19 +14,23 @@ public class CaptureSnapshot { private long lastIndex; private long lastTerm; private boolean installSnapshotInitiated; + private long replicatedToAllIndex; + private long replicatedToAllTerm; public CaptureSnapshot(long lastIndex, long lastTerm, - long lastAppliedIndex, long lastAppliedTerm) { - this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, false); + long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) { + this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false); } public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex, - long lastAppliedTerm, boolean installSnapshotInitiated) { + long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) { this.lastIndex = lastIndex; this.lastTerm = lastTerm; this.lastAppliedIndex = lastAppliedIndex; this.lastAppliedTerm = lastAppliedTerm; this.installSnapshotInitiated = installSnapshotInitiated; + this.replicatedToAllIndex = replicatedToAllIndex; + this.replicatedToAllTerm = replicatedToAllTerm; } public long getLastAppliedIndex() { @@ -48,4 +52,12 @@ public class CaptureSnapshot { public boolean isInstallSnapshotInitiated() { return installSnapshotInitiated; } + + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; + } + + public long getReplicatedToAllTerm() { + return replicatedToAllTerm; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 9e4f3b46c4..94c38f6108 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -91,8 +91,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; - private long replicatedToAllIndex = -1; - public AbstractLeader(RaftActorContext context) { super(context); @@ -236,15 +234,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void purgeInMemoryLog() { - //find the lowest index across followers which has been replicated to all. -1 if there are no followers. + //find the lowest index across followers which has been replicated to all. + // lastApplied if there are no followers, so that we keep clearing the log for single-node // we would delete the in-mem log from that index on, in-order to minimize mem usage // we would also share this info thru AE with the followers so that they can delete their log entries as well. - long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE; + long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE; for (FollowerLogInformation info : followerToLog.values()) { minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex()); } - replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex); + super.performSnapshotWithoutCapture(minReplicatedToAllIndex); } @Override @@ -472,7 +471,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex) { + leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent @@ -507,7 +506,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), prevLogIndex(followerNextIndex), prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), replicatedToAllIndex); + context.getCommitIndex(), super.getReplicatedToAllIndex()); if(!entries.isEmpty()) { LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, @@ -518,7 +517,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } /** - * /** * Install Snapshot works as follows * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log @@ -533,10 +531,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * @param followerNextIndex */ private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet()); - } - if (!context.getReplicatedLog().isPresent(followerNextIndex) && context.getReplicatedLog().isInSnapshot(followerNextIndex)) { @@ -557,15 +551,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (lastAppliedEntry != null) { lastAppliedIndex = lastAppliedEntry.getIndex(); lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { + } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); } boolean isInstallSnapshotInitiated = true; - actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), - lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), - actor()); + long replicatedToAllIndex = super.getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), + isInstallSnapshotInitiated), actor()); context.setSnapshotCaptureInitiated(true); } } @@ -605,8 +602,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), - followerToSnapshot.getTotalChunks(), + followerToSnapshot.incrementChunkIndex(), + followerToSnapshot.getTotalChunks(), Optional.of(followerToSnapshot.getLastChunkHashCode()) ).toSerializable(), actor() diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 075b2873e4..8f433d529a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -58,12 +58,23 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected String leaderId = null; + private long replicatedToAllIndex = -1; protected AbstractRaftActorBehavior(RaftActorContext context) { this.context = context; this.LOG = context.getLogger(); } + @Override + public void setReplicatedToAllIndex(long replicatedToAllIndex) { + this.replicatedToAllIndex = replicatedToAllIndex; + } + + @Override + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; + } + /** * Derived classes should not directly handle AppendEntries messages it * should let the base class handle it first. Once the base class handles @@ -423,17 +434,25 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } - protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) { + /** + * Performs a snapshot with no capture on the replicated log. + * It clears the log from the supplied index or last-applied-1 which ever is minimum. + * + * @param snapshotCapturedIndex + */ + protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { // we would want to keep the lastApplied as its used while capturing snapshots - long tempMin = Math.min(minReplicatedToAllIndex, - (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1)); + long lastApplied = context.getLastApplied(); + long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { - context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm()); + //use the term of the temp-min, since we check for isPresent, entry will not be null + ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); + context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); context.getReplicatedLog().snapshotCommit(); - return tempMin; + setReplicatedToAllIndex(tempMin); } - return currentReplicatedIndex; } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 8a0788702d..675543f2d1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -255,7 +255,7 @@ public class Follower extends AbstractRaftActorBehavior { lastIndex(), lastTerm()), actor()); if (!context.isSnapshotCaptureInitiated()) { - fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex()); + super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex()); } return this; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java index 064cd8b88c..b766e0ce39 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java @@ -50,4 +50,16 @@ public interface RaftActorBehavior extends AutoCloseable{ * @return */ String getLeaderId(); + + /** + * setting the index of the log entry which is replicated to all nodes + * @param replicatedToAllIndex + */ + void setReplicatedToAllIndex(long replicatedToAllIndex); + + /** + * getting the index of the log entry which is replicated to all nodes + * @return + */ + long getReplicatedToAllIndex(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index ffd8edfbe1..885c3ab109 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -20,6 +20,7 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry; + /** * */ @@ -130,11 +131,17 @@ public class AbstractReplicatedLogImplTest { @Test public void testSnapshotPreCommit() { + //add 4 more entries replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E"))); replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F"))); replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G"))); replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H"))); + //sending negative values should not cause any changes + replicatedLogImpl.snapshotPreCommit(-1, -1); + assertEquals(8, replicatedLogImpl.size()); + assertEquals(-1, replicatedLogImpl.getSnapshotIndex()); + replicatedLogImpl.snapshotPreCommit(4, 3); assertEquals(3, replicatedLogImpl.size()); assertEquals(4, replicatedLogImpl.getSnapshotIndex()); @@ -152,7 +159,31 @@ public class AbstractReplicatedLogImplTest { assertEquals(0, replicatedLogImpl.size()); assertEquals(7, replicatedLogImpl.getSnapshotIndex()); + } + + @Test + public void testIsPresent() { + assertTrue(replicatedLogImpl.isPresent(0)); + assertTrue(replicatedLogImpl.isPresent(1)); + assertTrue(replicatedLogImpl.isPresent(2)); + assertTrue(replicatedLogImpl.isPresent(3)); + + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("D"))); + replicatedLogImpl.snapshotPreCommit(3, 2); //snapshot on 3 + replicatedLogImpl.snapshotCommit(); + + assertFalse(replicatedLogImpl.isPresent(0)); + assertFalse(replicatedLogImpl.isPresent(1)); + assertFalse(replicatedLogImpl.isPresent(2)); + assertFalse(replicatedLogImpl.isPresent(3)); + assertTrue(replicatedLogImpl.isPresent(4)); + + replicatedLogImpl.snapshotPreCommit(4, 2); //snapshot on 4 + replicatedLogImpl.snapshotCommit(); + assertFalse(replicatedLogImpl.isPresent(4)); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("D"))); + assertTrue(replicatedLogImpl.isPresent(5)); } // create a snapshot for test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 59046fd779..1cd8550be7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -691,7 +691,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1)); RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); @@ -737,7 +737,8 @@ public class RaftActorTest extends AbstractActorTest { RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1)); + long replicatedToAllIndex = 1; + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1)); verify(mockRaftActor.delegate).createSnapshot(); @@ -749,13 +750,16 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider).deleteMessages(100); - assertEquals(2, mockRaftActor.getReplicatedLog().size()); + assertEquals(3, mockRaftActor.getReplicatedLog().size()); + assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex()); + assertNotNull(mockRaftActor.getReplicatedLog().get(2)); assertNotNull(mockRaftActor.getReplicatedLog().get(3)); assertNotNull(mockRaftActor.getReplicatedLog().get(4)); // Index 2 will not be in the log because it was removed due to snapshotting - assertNull(mockRaftActor.getReplicatedLog().get(2)); + assertNull(mockRaftActor.getReplicatedLog().get(1)); + assertNull(mockRaftActor.getReplicatedLog().get(0)); } }; @@ -870,7 +874,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -962,7 +966,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1)); + leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1)); + leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(leaderActor.delegate).createSnapshot(); @@ -1040,19 +1045,21 @@ public class RaftActorTest extends AbstractActorTest { followerActor.waitForInitializeBehaviorComplete(); - // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + Follower follower = new Follower(followerActor.getRaftActorContext()); followerActor.setCurrentBehavior(follower); assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build()); - // log as indices 0-5 + // log has indices 0-5 assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1)); + followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1)); + followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(followerActor.delegate).createSnapshot(); @@ -1090,7 +1097,7 @@ public class RaftActorTest extends AbstractActorTest { followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); - // capture snapshot reply should remove the snapshotted entries only + // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log assertEquals(7, followerActor.getReplicatedLog().lastIndex()); @@ -1150,16 +1157,22 @@ public class RaftActorTest extends AbstractActorTest { // create 5 entries in the log MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build()); + //set the snapshot index to 4 , 0 to 4 are snapshotted leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4); + //setting replicatedToAllIndex = 9, for the log to clear + leader.setReplicatedToAllIndex(9); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // set the 2nd follower nextIndex to 1 which has been snapshotted leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // simulate a real snapshot leaderActor.onReceiveCommand(new SendHeartBeat()); @@ -1182,7 +1195,7 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); - assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size()); + assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); //reply from a slow follower after should not raise errors leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 42a7911be3..c133c0615f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -1,8 +1,12 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; +import java.util.ArrayList; +import java.util.List; import org.junit.Test; import org.opendaylight.controller.cluster.raft.AbstractActorTest; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; @@ -17,12 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { private final ActorRef behaviorActor = getSystem().actorOf(Props.create( @@ -302,38 +300,52 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { } @Test - public void testFakeSnapshots() { + public void testPerformSnapshot() { MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor); - AbstractRaftActorBehavior behavior = new Leader(context); - context.getTermInformation().update(1, "leader"); + AbstractRaftActorBehavior abstractBehavior = (AbstractRaftActorBehavior) createBehavior(context); + if (abstractBehavior instanceof Candidate) { + return; + } - //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the + context.getTermInformation().update(1, "test"); + + //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); context.setLastApplied(0); - assertEquals(-1, behavior.fakeSnapshot(0, -1)); + abstractBehavior.performSnapshotWithoutCapture(0); + assertEquals(-1, abstractBehavior.getReplicatedToAllIndex()); assertEquals(1, context.getReplicatedLog().size()); //2 entries, lastApplied still 0, no purging. - context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build()); + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build()); context.setLastApplied(0); - assertEquals(-1, behavior.fakeSnapshot(0, -1)); + abstractBehavior.performSnapshotWithoutCapture(0); + assertEquals(-1, abstractBehavior.getReplicatedToAllIndex()); assertEquals(2, context.getReplicatedLog().size()); //2 entries, lastApplied still 0, no purging. - context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build()); + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build()); context.setLastApplied(1); - assertEquals(0, behavior.fakeSnapshot(0, -1)); + abstractBehavior.performSnapshotWithoutCapture(0); + assertEquals(0, abstractBehavior.getReplicatedToAllIndex()); assertEquals(1, context.getReplicatedLog().size()); //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged - context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build()); + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build()); context.setLastApplied(2); - assertEquals(1, behavior.fakeSnapshot(3, 1)); + abstractBehavior.performSnapshotWithoutCapture(3); + assertEquals(1, abstractBehavior.getReplicatedToAllIndex()); assertEquals(3, context.getReplicatedLog().size()); - + // scenario where Last applied > Replicated to all index (becoz of a slow follower) + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + context.setLastApplied(2); + abstractBehavior.performSnapshotWithoutCapture(1); + assertEquals(1, abstractBehavior.getReplicatedToAllIndex()); + assertEquals(1, context.getReplicatedLog().size()); } + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm( ActorRef actorRef, RaftRPC rpc) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 94b9698abf..4b0651a48e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -1467,7 +1467,7 @@ public class ShardTest extends AbstractActorTest { NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); - CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1); + CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1); shard.tell(capture, getRef()); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); -- 2.36.6