X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=ee6e5776e15161c693ebd8aab83a6895f501353f;hp=ba0bd0f29c96237ab758487b719eac959d115edc;hb=a37aef6c01f720b935535b11cf9d7689ceea9470;hpb=8eaba1eb027b02f8b36480721055dc99c6700e85 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index ba0bd0f29c..ee6e5776e1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -9,10 +9,10 @@ import akka.actor.Props; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,8 +25,10 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -57,6 +59,7 @@ public class LeaderTest extends AbstractLeaderTest { Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower")); private Leader leader; + private final short payloadVersion = 5; @Override @After @@ -85,6 +88,8 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); + short payloadVersion = (short)5; + actorContext.setPayloadVersion(payloadVersion); long term = 1; actorContext.getTermInformation().update(term, ""); @@ -98,10 +103,11 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm()); assertEquals("Entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion()); // The follower would normally reply - simulate that explicitly here. leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex - 1, term)); + FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -118,6 +124,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entries size", 1, appendEntries.getEntries().size()); assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion()); } @@ -146,16 +153,50 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); - MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 1, lastIndex + 1, payload); - actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1); + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1); + + // State should not change + assertTrue(raftBehavior instanceof Leader); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex()); + } + + @Test + public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { + logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setRaftPolicy(createRaftPolicy(true, true)); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short) 0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -166,7 +207,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entries size", 1, appendEntries.getEntries().size()); assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); - assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex()); } @Test @@ -192,7 +234,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -232,7 +274,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -240,7 +282,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i=0;i<3;i++) { sendReplicate(actorContext, lastIndex+i+1); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex + i + 1, term)); + FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0)); } @@ -282,7 +324,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -327,7 +369,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -364,7 +406,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -437,8 +479,8 @@ public class LeaderTest extends AbstractLeaderTest { //clears leaders log actorContext.getReplicatedLog().removeFrom(0); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int newEntryIndex = 4; final int snapshotTerm = 1; final int currentTerm = 2; @@ -446,12 +488,15 @@ public class LeaderTest extends AbstractLeaderTest { // set the snapshot variables in replicatedlog actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); //set follower timeout to 2 mins, helps during debugging actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10)); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + // new entry ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, @@ -461,7 +506,8 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive(FOLLOWER_ID); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm)); FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); leader.setFollowerSnapshot(FOLLOWER_ID, fts); @@ -487,7 +533,7 @@ public class LeaderTest extends AbstractLeaderTest { InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); - assertEquals(snapshotIndex, is.getLastIncludedIndex()); + assertEquals(commitIndex, is.getLastIncludedIndex()); } @Test @@ -536,7 +582,7 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue(raftBehavior instanceof Leader); - MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); } @Test @@ -571,7 +617,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(Optional.absent()); + leader.setSnapshot(null); // new entry ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, @@ -584,7 +630,9 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); + + CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); @@ -595,8 +643,7 @@ public class LeaderTest extends AbstractLeaderTest { // if an initiate is started again when first is in progress, it shouldnt initiate Capture leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - List captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class); - assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size()); + Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @Test @@ -613,8 +660,8 @@ public class LeaderTest extends AbstractLeaderTest { //clears leaders log actorContext.getReplicatedLog().removeFrom(0); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int lastAppliedIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -622,15 +669,22 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(lastAppliedIndex); + actorContext.setLastApplied(lastAppliedIndex); leader = new Leader(actorContext); - // Ignore initial heartbeat. + // Initial heartbeat. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new SendInstallSnapshot(toByteString(leadersSnapshot))); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + + Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), + Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); assertTrue(raftBehavior instanceof Leader); @@ -639,7 +693,7 @@ public class LeaderTest extends AbstractLeaderTest { InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertNotNull(installSnapshot.getData()); - assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex()); + assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); assertEquals(currentTerm, installSnapshot.getTerm()); @@ -651,15 +705,18 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + // Ignore initial heartbeat. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -675,7 +732,8 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm)); FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); leader.setFollowerSnapshot(FOLLOWER_ID, fts); while(!fts.isLastChunk(fts.getChunkIndex())) { @@ -695,9 +753,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(1, leader.followerLogSize()); FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID); assertNotNull(fli); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex + 1, fli.getNextIndex()); + assertEquals(commitIndex, fli.getMatchIndex()); + assertEquals(commitIndex + 1, fli.getNextIndex()); } @Test @@ -706,8 +763,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -721,10 +778,13 @@ public class LeaderTest extends AbstractLeaderTest { configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); actorContext.setConfigParams(configParams); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -736,9 +796,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm); + leader.setSnapshot(snapshot); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -777,8 +839,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -789,10 +851,13 @@ public class LeaderTest extends AbstractLeaderTest { } }); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -804,10 +869,12 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm); + leader.setSnapshot(snapshot); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -836,8 +903,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -848,10 +915,13 @@ public class LeaderTest extends AbstractLeaderTest { } }); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -863,9 +933,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm); + leader.setSnapshot(snapshot); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -965,9 +1037,19 @@ public class LeaderTest extends AbstractLeaderTest { configParams.setElectionTimeoutFactor(100000); MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef); context.setConfigParams(configParams); + context.setPayloadVersion(payloadVersion); return context; } + private MockRaftActorContext createFollowerActorContextWithLeader() { + MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl(); + followerConfig.setElectionTimeoutFactor(10000); + followerActorContext.setConfigParams(followerConfig); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); + return followerActorContext; + } + @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); @@ -1096,26 +1178,242 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplyFailure(){ - logStart("testHandleAppendEntriesReplyFailure"); + public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){ + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + long leaderCommitIndex = 2; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + followerActorContext.setCommitIndex(0); + followerActorContext.setLastApplied(0); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); leader = new Leader(leaderActorContext); - // Send initial heartbeat reply with last index. - leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1)); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersThirdLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); - assertEquals("getNextIndex", 11, followerInfo.getNextIndex()); + assertEquals("getNextIndex", 3, followerInfo.getNextIndex()); - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1); + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); - RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex()); + } - assertEquals(RaftState.Leader, raftActorBehavior.state()); + @Test + public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() { + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build()); + long leaderCommitIndex = 1; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); + } + + @Test + public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){ + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); - assertEquals("getNextIndex", 10, followerInfo.getNextIndex()); + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); + long leaderCommitIndex = 1; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); + assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm()); } @Test @@ -1133,7 +1431,10 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1); + assertEquals(payloadVersion, leader.getLeaderPayloadVersion()); + + short payloadVersion = 5; + AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); @@ -1156,6 +1457,9 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState applyState = applyStateList.get(0); assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals(payloadVersion, followerInfo.getPayloadVersion()); } @Test @@ -1166,13 +1470,95 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); - AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1); + AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); assertEquals(RaftState.Leader, raftActorBehavior.state()); } + @Test + public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() { + logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build()); + long leaderCommitIndex = 3; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); + ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); + + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersThirdLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersFourthLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 4, followerInfo.getNextIndex()); + + MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4); + + assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex()); + } + @Test public void testHandleRequestVoteReply(){ logStart("testHandleRequestVoteReply"); @@ -1255,83 +1641,6 @@ public class LeaderTest extends AbstractLeaderTest { }}; } - - @Test - public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception { - logStart("testAppendEntryCallAtEndofAppendEntryReply"); - - MockRaftActorContext leaderActorContext = createActorContextWithFollower(); - - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); - configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); - - leaderActorContext.setConfigParams(configParams); - - MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); - - followerActorContext.setConfigParams(configParams); - followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); - - Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().setBehavior(follower); - - leaderActorContext.getReplicatedLog().removeFrom(0); - leaderActorContext.setCommitIndex(-1); - leaderActorContext.setLastApplied(-1); - - followerActorContext.getReplicatedLog().removeFrom(0); - followerActorContext.setCommitIndex(-1); - followerActorContext.setLastApplied(-1); - - leader = new Leader(leaderActorContext); - - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching( - leaderActor, AppendEntriesReply.class); - - leader.handleMessage(followerActor, appendEntriesReply); - - // Clear initial heartbeat messages - - leaderActor.underlyingActor().clear(); - followerActor.underlyingActor().clear(); - - // create 3 entries - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - leaderActorContext.setCommitIndex(1); - leaderActorContext.setLastApplied(1); - - Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); - - leader.handleMessage(leaderActor, new SendHeartBeat()); - - AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - - // Should send first log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(0, appendEntries.getEntries().get(0).getIndex()); - assertEquals(-1, appendEntries.getPrevLogIndex()); - - appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - - assertEquals(1, appendEntriesReply.getLogLastTerm()); - assertEquals(0, appendEntriesReply.getLogLastIndex()); - - followerActor.underlyingActor().clear(); - - leader.handleAppendEntriesReply(followerActor, appendEntriesReply); - - appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - - // Should send second log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getEntries().get(0).getIndex()); - - follower.close(); - } - @Test public void testLaggingFollowerStarvation() throws Exception { logStart("testLaggingFollowerStarvation"); @@ -1372,7 +1681,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i=1;i<6;i++) { // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) - RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1)); + RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0)); assertTrue(newBehavior == leader); Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); }