From: Moiz Raja Date: Wed, 18 Feb 2015 22:38:50 +0000 (+0000) Subject: Merge "Bug-2692:Real snapshots should use the replicatedToAllIndex for clearing in... X-Git-Tag: release/lithium~545 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=2da365966737c83347ef7b84cc155246e10f1593;hp=97307fbbbee1f9bdb1409ac962386779dc4e93bf;p=controller.git Merge "Bug-2692:Real snapshots should use the replicatedToAllIndex for clearing in-mem log" --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index d1c3fefee8..e2aa16918e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -41,7 +41,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } protected int adjustedIndex(long logEntryIndex) { - if(snapshotIndex < 0){ + if (snapshotIndex < 0) { return (int) logEntryIndex; } return (int) (logEntryIndex - (snapshotIndex + 1)); @@ -134,6 +134,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { return journal.size(); } + @Override + public int dataSize() { + return dataSize; + } + @Override public boolean isPresent(long logEntryIndex) { if (logEntryIndex > lastIndex()) { @@ -200,6 +205,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { previousSnapshotIndex = -1; previousSnapshotTerm = -1; dataSize = 0; + // need to recalc the datasize based on the entries left after precommit. + for(ReplicatedLogEntry logEntry : journal) { + dataSize += logEntry.size(); + } + } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1f446c72f1..353d0b4a24 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -573,7 +573,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * This method is called during recovery to reconstruct the state of the actor. * - * @param snapshot A snapshot of the state of the actor + * @param snapshotBytes A snapshot of the state of the actor */ protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); @@ -668,12 +668,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); - //be greedy and remove entries from in-mem journal which are in the snapshot - // and update snapshotIndex and snapshotTerm without waiting for the success, + long dataThreshold = Runtime.getRuntime().totalMemory() * + getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; + if (context.getReplicatedLog().dataSize() > dataThreshold) { + // if memory is less, clear the log based on lastApplied. + // this could/should only happen if one of the followers is down + // as normally we keep removing from the log when its replicated to all. + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); - context.getReplicatedLog().snapshotPreCommit( - captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); + } else { + // clear the log based on replicatedToAllIndex + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), + captureSnapshot.getReplicatedToAllTerm()); + } + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), @@ -717,13 +726,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // FIXME: Maybe this should be done after the command is saved journal.subList(adjustedIndex , journal.size()).clear(); - persistence().persist(new DeleteEntries(adjustedIndex), new Procedure(){ + persistence().persist(new DeleteEntries(adjustedIndex), new Procedure() { - @Override public void apply(DeleteEntries param) - throws Exception { + @Override + public void apply(DeleteEntries param) + throws Exception { //FIXME : Doing nothing for now dataSize = 0; - for(ReplicatedLogEntry entry : journal){ + for (ReplicatedLogEntry entry : journal) { dataSize += entry.size(); } } @@ -735,11 +745,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { appendAndPersist(replicatedLogEntry, null); } - @Override - public int dataSize() { - return dataSize; - } - public void appendAndPersist( final ReplicatedLogEntry replicatedLogEntry, final Procedure callback) { @@ -766,7 +771,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long dataSizeForCheck = dataSize; dataSizeSinceLastSnapshot += logEntrySize; - long journalSize = lastIndex()+1; + long journalSize = lastIndex() + 1; if(!hasFollowers()) { // When we do not have followers we do not maintain an in-memory log @@ -817,12 +822,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } // send a CaptureSnapshot to self to make the expensive operation async. - getSelf().tell(new CaptureSnapshot( - lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), + long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)), null); context.setSnapshotCaptureInitiated(true); } - if(callback != null){ + if (callback != null){ callback.apply(replicatedLogEntry); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 80b7ad90d0..82d0839bee 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -145,14 +145,14 @@ public interface ReplicatedLog { * sets snapshot term * @param snapshotTerm */ - public void setSnapshotTerm(long snapshotTerm); + void setSnapshotTerm(long snapshotTerm); /** * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive) * @param startIndex * @param endIndex */ - public void clear(int startIndex, int endIndex); + void clear(int startIndex, int endIndex); /** * Handles all the bookkeeping in order to perform a rollback in the @@ -160,20 +160,21 @@ public interface ReplicatedLog { * @param snapshotCapturedIndex * @param snapshotCapturedTerm */ - public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm); + void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm); /** * Sets the Replicated log to state after snapshot success. */ - public void snapshotCommit(); + void snapshotCommit(); /** * Restores the replicated log to a state in the event of a save snapshot failure */ - public void snapshotRollback(); + void snapshotRollback(); /** * Size of the data in the log (in bytes) */ - public int dataSize(); + int dataSize(); + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java index d4dd3350f3..a96b1e435c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -14,19 +14,23 @@ public class CaptureSnapshot { private long lastIndex; private long lastTerm; private boolean installSnapshotInitiated; + private long replicatedToAllIndex; + private long replicatedToAllTerm; public CaptureSnapshot(long lastIndex, long lastTerm, - long lastAppliedIndex, long lastAppliedTerm) { - this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, false); + long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) { + this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false); } public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex, - long lastAppliedTerm, boolean installSnapshotInitiated) { + long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) { this.lastIndex = lastIndex; this.lastTerm = lastTerm; this.lastAppliedIndex = lastAppliedIndex; this.lastAppliedTerm = lastAppliedTerm; this.installSnapshotInitiated = installSnapshotInitiated; + this.replicatedToAllIndex = replicatedToAllIndex; + this.replicatedToAllTerm = replicatedToAllTerm; } public long getLastAppliedIndex() { @@ -48,4 +52,12 @@ public class CaptureSnapshot { public boolean isInstallSnapshotInitiated() { return installSnapshotInitiated; } + + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; + } + + public long getReplicatedToAllTerm() { + return replicatedToAllTerm; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 9e4f3b46c4..94c38f6108 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -91,8 +91,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; - private long replicatedToAllIndex = -1; - public AbstractLeader(RaftActorContext context) { super(context); @@ -236,15 +234,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void purgeInMemoryLog() { - //find the lowest index across followers which has been replicated to all. -1 if there are no followers. + //find the lowest index across followers which has been replicated to all. + // lastApplied if there are no followers, so that we keep clearing the log for single-node // we would delete the in-mem log from that index on, in-order to minimize mem usage // we would also share this info thru AE with the followers so that they can delete their log entries as well. - long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE; + long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE; for (FollowerLogInformation info : followerToLog.values()) { minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex()); } - replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex); + super.performSnapshotWithoutCapture(minReplicatedToAllIndex); } @Override @@ -472,7 +471,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex) { + leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent @@ -507,7 +506,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), prevLogIndex(followerNextIndex), prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), replicatedToAllIndex); + context.getCommitIndex(), super.getReplicatedToAllIndex()); if(!entries.isEmpty()) { LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, @@ -518,7 +517,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } /** - * /** * Install Snapshot works as follows * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log @@ -533,10 +531,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * @param followerNextIndex */ private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet()); - } - if (!context.getReplicatedLog().isPresent(followerNextIndex) && context.getReplicatedLog().isInSnapshot(followerNextIndex)) { @@ -557,15 +551,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (lastAppliedEntry != null) { lastAppliedIndex = lastAppliedEntry.getIndex(); lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { + } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); } boolean isInstallSnapshotInitiated = true; - actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), - lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), - actor()); + long replicatedToAllIndex = super.getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), + isInstallSnapshotInitiated), actor()); context.setSnapshotCaptureInitiated(true); } } @@ -605,8 +602,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), - followerToSnapshot.getTotalChunks(), + followerToSnapshot.incrementChunkIndex(), + followerToSnapshot.getTotalChunks(), Optional.of(followerToSnapshot.getLastChunkHashCode()) ).toSerializable(), actor() diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 075b2873e4..8f433d529a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -58,12 +58,23 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected String leaderId = null; + private long replicatedToAllIndex = -1; protected AbstractRaftActorBehavior(RaftActorContext context) { this.context = context; this.LOG = context.getLogger(); } + @Override + public void setReplicatedToAllIndex(long replicatedToAllIndex) { + this.replicatedToAllIndex = replicatedToAllIndex; + } + + @Override + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; + } + /** * Derived classes should not directly handle AppendEntries messages it * should let the base class handle it first. Once the base class handles @@ -423,17 +434,25 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } - protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) { + /** + * Performs a snapshot with no capture on the replicated log. + * It clears the log from the supplied index or last-applied-1 which ever is minimum. + * + * @param snapshotCapturedIndex + */ + protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { // we would want to keep the lastApplied as its used while capturing snapshots - long tempMin = Math.min(minReplicatedToAllIndex, - (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1)); + long lastApplied = context.getLastApplied(); + long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { - context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm()); + //use the term of the temp-min, since we check for isPresent, entry will not be null + ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); + context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); context.getReplicatedLog().snapshotCommit(); - return tempMin; + setReplicatedToAllIndex(tempMin); } - return currentReplicatedIndex; } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 8a0788702d..675543f2d1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -255,7 +255,7 @@ public class Follower extends AbstractRaftActorBehavior { lastIndex(), lastTerm()), actor()); if (!context.isSnapshotCaptureInitiated()) { - fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex()); + super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex()); } return this; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java index 064cd8b88c..b766e0ce39 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java @@ -50,4 +50,16 @@ public interface RaftActorBehavior extends AutoCloseable{ * @return */ String getLeaderId(); + + /** + * setting the index of the log entry which is replicated to all nodes + * @param replicatedToAllIndex + */ + void setReplicatedToAllIndex(long replicatedToAllIndex); + + /** + * getting the index of the log entry which is replicated to all nodes + * @return + */ + long getReplicatedToAllIndex(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index ffd8edfbe1..885c3ab109 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -20,6 +20,7 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry; + /** * */ @@ -130,11 +131,17 @@ public class AbstractReplicatedLogImplTest { @Test public void testSnapshotPreCommit() { + //add 4 more entries replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E"))); replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F"))); replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G"))); replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H"))); + //sending negative values should not cause any changes + replicatedLogImpl.snapshotPreCommit(-1, -1); + assertEquals(8, replicatedLogImpl.size()); + assertEquals(-1, replicatedLogImpl.getSnapshotIndex()); + replicatedLogImpl.snapshotPreCommit(4, 3); assertEquals(3, replicatedLogImpl.size()); assertEquals(4, replicatedLogImpl.getSnapshotIndex()); @@ -152,7 +159,31 @@ public class AbstractReplicatedLogImplTest { assertEquals(0, replicatedLogImpl.size()); assertEquals(7, replicatedLogImpl.getSnapshotIndex()); + } + + @Test + public void testIsPresent() { + assertTrue(replicatedLogImpl.isPresent(0)); + assertTrue(replicatedLogImpl.isPresent(1)); + assertTrue(replicatedLogImpl.isPresent(2)); + assertTrue(replicatedLogImpl.isPresent(3)); + + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("D"))); + replicatedLogImpl.snapshotPreCommit(3, 2); //snapshot on 3 + replicatedLogImpl.snapshotCommit(); + + assertFalse(replicatedLogImpl.isPresent(0)); + assertFalse(replicatedLogImpl.isPresent(1)); + assertFalse(replicatedLogImpl.isPresent(2)); + assertFalse(replicatedLogImpl.isPresent(3)); + assertTrue(replicatedLogImpl.isPresent(4)); + + replicatedLogImpl.snapshotPreCommit(4, 2); //snapshot on 4 + replicatedLogImpl.snapshotCommit(); + assertFalse(replicatedLogImpl.isPresent(4)); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("D"))); + assertTrue(replicatedLogImpl.isPresent(5)); } // create a snapshot for test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 59046fd779..1cd8550be7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -691,7 +691,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1)); RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); @@ -737,7 +737,8 @@ public class RaftActorTest extends AbstractActorTest { RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1)); + long replicatedToAllIndex = 1; + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1)); verify(mockRaftActor.delegate).createSnapshot(); @@ -749,13 +750,16 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider).deleteMessages(100); - assertEquals(2, mockRaftActor.getReplicatedLog().size()); + assertEquals(3, mockRaftActor.getReplicatedLog().size()); + assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex()); + assertNotNull(mockRaftActor.getReplicatedLog().get(2)); assertNotNull(mockRaftActor.getReplicatedLog().get(3)); assertNotNull(mockRaftActor.getReplicatedLog().get(4)); // Index 2 will not be in the log because it was removed due to snapshotting - assertNull(mockRaftActor.getReplicatedLog().get(2)); + assertNull(mockRaftActor.getReplicatedLog().get(1)); + assertNull(mockRaftActor.getReplicatedLog().get(0)); } }; @@ -870,7 +874,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -962,7 +966,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1)); + leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1)); + leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(leaderActor.delegate).createSnapshot(); @@ -1040,19 +1045,21 @@ public class RaftActorTest extends AbstractActorTest { followerActor.waitForInitializeBehaviorComplete(); - // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + Follower follower = new Follower(followerActor.getRaftActorContext()); followerActor.setCurrentBehavior(follower); assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build()); - // log as indices 0-5 + // log has indices 0-5 assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1)); + followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1)); + followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(followerActor.delegate).createSnapshot(); @@ -1090,7 +1097,7 @@ public class RaftActorTest extends AbstractActorTest { followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); - // capture snapshot reply should remove the snapshotted entries only + // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log assertEquals(7, followerActor.getReplicatedLog().lastIndex()); @@ -1150,16 +1157,22 @@ public class RaftActorTest extends AbstractActorTest { // create 5 entries in the log MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build()); + //set the snapshot index to 4 , 0 to 4 are snapshotted leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4); + //setting replicatedToAllIndex = 9, for the log to clear + leader.setReplicatedToAllIndex(9); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // set the 2nd follower nextIndex to 1 which has been snapshotted leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1)); assertEquals(5, leaderActor.getReplicatedLog().size()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); // simulate a real snapshot leaderActor.onReceiveCommand(new SendHeartBeat()); @@ -1182,7 +1195,7 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); - assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size()); + assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); //reply from a slow follower after should not raise errors leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 42a7911be3..c133c0615f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -1,8 +1,12 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; +import java.util.ArrayList; +import java.util.List; import org.junit.Test; import org.opendaylight.controller.cluster.raft.AbstractActorTest; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; @@ -17,12 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { private final ActorRef behaviorActor = getSystem().actorOf(Props.create( @@ -302,38 +300,52 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { } @Test - public void testFakeSnapshots() { + public void testPerformSnapshot() { MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor); - AbstractRaftActorBehavior behavior = new Leader(context); - context.getTermInformation().update(1, "leader"); + AbstractRaftActorBehavior abstractBehavior = (AbstractRaftActorBehavior) createBehavior(context); + if (abstractBehavior instanceof Candidate) { + return; + } - //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the + context.getTermInformation().update(1, "test"); + + //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); context.setLastApplied(0); - assertEquals(-1, behavior.fakeSnapshot(0, -1)); + abstractBehavior.performSnapshotWithoutCapture(0); + assertEquals(-1, abstractBehavior.getReplicatedToAllIndex()); assertEquals(1, context.getReplicatedLog().size()); //2 entries, lastApplied still 0, no purging. - context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build()); + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build()); context.setLastApplied(0); - assertEquals(-1, behavior.fakeSnapshot(0, -1)); + abstractBehavior.performSnapshotWithoutCapture(0); + assertEquals(-1, abstractBehavior.getReplicatedToAllIndex()); assertEquals(2, context.getReplicatedLog().size()); //2 entries, lastApplied still 0, no purging. - context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build()); + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build()); context.setLastApplied(1); - assertEquals(0, behavior.fakeSnapshot(0, -1)); + abstractBehavior.performSnapshotWithoutCapture(0); + assertEquals(0, abstractBehavior.getReplicatedToAllIndex()); assertEquals(1, context.getReplicatedLog().size()); //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged - context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build()); + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build()); context.setLastApplied(2); - assertEquals(1, behavior.fakeSnapshot(3, 1)); + abstractBehavior.performSnapshotWithoutCapture(3); + assertEquals(1, abstractBehavior.getReplicatedToAllIndex()); assertEquals(3, context.getReplicatedLog().size()); - + // scenario where Last applied > Replicated to all index (becoz of a slow follower) + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + context.setLastApplied(2); + abstractBehavior.performSnapshotWithoutCapture(1); + assertEquals(1, abstractBehavior.getReplicatedToAllIndex()); + assertEquals(1, context.getReplicatedLog().size()); } + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm( ActorRef actorRef, RaftRPC rpc) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 94b9698abf..4b0651a48e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -1467,7 +1467,7 @@ public class ShardTest extends AbstractActorTest { NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); - CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1); + CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1); shard.tell(capture, getRef()); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));