From a93bcbe711f66ef6ec7bc97972f108859c87a11e Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Mon, 3 Aug 2015 17:05:14 -0700 Subject: [PATCH] BUG 2185 : Follower should request forceInstallSnapshot in out-of-sync scenario When the Follower detects that it has more entries in it's log than the Leader it might be an indication that the Follower was previously a Leader and therefore it has additional entries in it's log which are missing in the Leader. When the RaftPolicy is set to allow commits before consensus this could also mean that the state now has more data than should be present in there. In this scenario Follower requests the Leader to InstallSnapshot. Change-Id: I517af148c3933f798ceb87ff88c77c396590881f Signed-off-by: Moiz Raja (cherry picked from commit 8c3896a99a221d3411f88f3a53c7d9fe6b89734e) --- .../raft/behaviors/AbstractLeader.java | 53 +++++--- .../cluster/raft/behaviors/Follower.java | 17 ++- .../raft/messages/AppendEntriesReply.java | 16 ++- .../cluster/raft/behaviors/FollowerTest.java | 46 +++++++ .../cluster/raft/behaviors/LeaderTest.java | 118 +++++++++++++++++- 5 files changed, 219 insertions(+), 31 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 85f2f153ab..5789895997 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -196,7 +196,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex); - if(followerLastLogIndex < 0 || (followersLastLogEntry != null && + if(appendEntriesReply.isForceInstallSnapshot()) { + // Reset the followers match and next index. This is to signal that this follower has nothing + // in common with this Leader and so would require a snapshot to be installed + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + // Force initiate a snapshot capture + initiateCaptureSnapshot(followerId); + } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null && followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) { // The follower's log is empty or the last entry is present in the leader's journal // and the terms match so the follower is just behind the leader's journal from @@ -539,7 +547,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntries = true; - initiateCaptureSnapshot(followerId, followerNextIndex); + if (canInstallSnapshot(followerNextIndex)) { + initiateCaptureSnapshot(followerId); + } } else if(sendHeartbeat) { // we send an AppendEntries, even if the follower is inactive @@ -583,27 +593,32 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply) * then send the existing snapshot in chunks to the follower. * @param followerId - * @param followerNextIndex */ - private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { - if (!context.getReplicatedLog().isPresent(followerNextIndex) && - context.getReplicatedLog().isInSnapshot(followerNextIndex)) { - - if (snapshot.isPresent()) { - // if a snapshot is present in the memory, most likely another install is in progress - // no need to capture snapshot. - // This could happen if another follower needs an install when one is going on. - final ActorSelection followerActor = context.getPeerActorSelection(followerId); - sendSnapshotChunk(followerActor, followerId); + private void initiateCaptureSnapshot(String followerId) { + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot. + // This could happen if another follower needs an install when one is going on. + final ActorSelection followerActor = context.getPeerActorSelection(followerId); + sendSnapshotChunk(followerActor, followerId); - } else { - context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), - this.getReplicatedToAllIndex(), followerId); - } + } else { + context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } + private boolean canInstallSnapshot(long nextIndex){ + // If the follower's nextIndex is -1 then we might as well send it a snapshot + // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present + // in the snapshot + return (nextIndex == -1 || + (!context.getReplicatedLog().isPresent(nextIndex) + && context.getReplicatedLog().isInSnapshot(nextIndex))); + + } + private void sendInstallSnapshot() { LOG.debug("{}: sendInstallSnapshot", logName()); @@ -612,9 +627,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (followerActor != null) { long nextIndex = e.getValue().getNextIndex(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { + if (canInstallSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, e.getKey()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 0cd6fbab52..974ec47585 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -147,7 +147,7 @@ public class Follower extends AbstractRaftActorBehavior { int addEntriesFrom = 0; if (context.getReplicatedLog().size() > 0) { - // Find the entry up until which the one that is not in the follower's log + // Find the entry up until the one that is not in the follower's log for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i); ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex()); @@ -161,12 +161,19 @@ public class Follower extends AbstractRaftActorBehavior { continue; } - LOG.debug("{}: Removing entries from log starting at {}", logName(), + if(!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) { + + LOG.debug("{}: Removing entries from log starting at {}", logName(), matchEntry.getIndex()); - // Entries do not match so remove all subsequent entries - context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex()); - break; + // Entries do not match so remove all subsequent entries + context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex()); + break; + } else { + sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, + lastTerm(), context.getPayloadVersion(), true), actor()); + return this; + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java index 990605b288..521a4512c7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java @@ -31,8 +31,15 @@ public class AppendEntriesReply extends AbstractRaftRPC { private final short payloadVersion; + private final boolean forceInstallSnapshot; + public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm, short payloadVersion) { + this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false); + } + + public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm, + short payloadVersion, boolean forceInstallSnapshot) { super(term); this.followerId = followerId; @@ -40,8 +47,10 @@ public class AppendEntriesReply extends AbstractRaftRPC { this.logLastIndex = logLastIndex; this.logLastTerm = logLastTerm; this.payloadVersion = payloadVersion; + this.forceInstallSnapshot = forceInstallSnapshot; } + @Override public long getTerm() { return term; @@ -72,7 +81,12 @@ public class AppendEntriesReply extends AbstractRaftRPC { StringBuilder builder = new StringBuilder(); builder.append("AppendEntriesReply [success=").append(success).append(", logLastIndex=").append(logLastIndex) .append(", logLastTerm=").append(logLastTerm).append(", followerId=").append(followerId) - .append(", payloadVersion=").append(payloadVersion).append("]"); + .append(", payloadVersion=").append(", forceInstallSnapshot=").append(forceInstallSnapshot) + .append(payloadVersion).append("]"); return builder.toString(); } + + public boolean isForceInstallSnapshot() { + return forceInstallSnapshot; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 42e35d5767..f189e2d2ee 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -599,6 +599,44 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3); } + @Test + public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() { + logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(2, 2, "two-1")); + entries.add(newReplicatedLogEntry(2, 3, "three")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); + + context.setRaftPolicy(createRaftPolicy(false, true)); + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true); + } + @Test public void testHandleAppendEntriesPreviousLogEntryMissing(){ logStart("testHandleAppendEntriesPreviousLogEntryMissing"); @@ -957,6 +995,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, String expFollowerId, long expLogLastTerm, long expLogLastIndex) { + expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false); + } + + private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, + String expFollowerId, long expLogLastTerm, long expLogLastIndex, + boolean expForceInstallSnapshot) { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); @@ -967,8 +1011,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm()); assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex()); assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion()); + assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot()); } + private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { return new MockRaftActorContext.MockReplicatedLogEntry(term, index, new MockRaftActorContext.MockPayload(data)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 9b877a7cdc..a391fdd342 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -601,11 +601,6 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - //clears leaders log actorContext.getReplicatedLog().removeFrom(0); @@ -656,6 +651,69 @@ public class LeaderTest extends AbstractLeaderTest { Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } + @Test + public void testInitiateForceInstallSnapshot() throws Exception { + logStart("testInitiateForceInstallSnapshot"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + final int followersLastIndex = 2; + final int snapshotIndex = -1; + final int newEntryIndex = 4; + final int snapshotTerm = -1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setLastApplied(3); + actorContext.setCommitIndex(followersLastIndex); + + actorContext.getReplicatedLog().removeFrom(0); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // set the snapshot as absent and check if capture-snapshot is invoked. + leader.setSnapshot(null); + + for(int i=0;i<4;i++) { + actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1, + new MockRaftActorContext.MockPayload("X" + i))); + } + + // new entry + ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + actorContext.getReplicatedLog().append(entry); + + //update follower timestamp + leader.markFollowerActive(FOLLOWER_ID); + + // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets + // installed with a SendInstallSnapshot + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true)); + + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); + + CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); + + assertTrue(cs.isInstallSnapshotInitiated()); + assertEquals(3, cs.getLastAppliedIndex()); + assertEquals(1, cs.getLastAppliedTerm()); + assertEquals(4, cs.getLastIndex()); + assertEquals(2, cs.getLastTerm()); + + // if an initiate is started again when first is in progress, it shouldnt initiate Capture + leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + + Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + } + + @Test public void testInstallSnapshot() throws Exception { logStart("testInstallSnapshot"); @@ -709,6 +767,56 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(currentTerm, installSnapshot.getTerm()); } + @Test + public void testForceInstallSnapshot() throws Exception { + logStart("testForceInstallSnapshot"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + final int lastAppliedIndex = 3; + final int snapshotIndex = -1; + final int snapshotTerm = -1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + actorContext.setCommitIndex(lastAppliedIndex); + actorContext.setLastApplied(lastAppliedIndex); + + leader = new Leader(actorContext); + + // Initial heartbeat. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(-1); + + Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), + Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + + assertTrue(raftBehavior instanceof Leader); + + // check if installsnapshot gets called with the correct values. + + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + + assertNotNull(installSnapshot.getData()); + assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); + assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); + + assertEquals(currentTerm, installSnapshot.getTerm()); + } + @Test public void testHandleInstallSnapshotReplyLastChunk() throws Exception { logStart("testHandleInstallSnapshotReplyLastChunk"); -- 2.36.6