X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=41278799ed934ae95c9b7d3dee4f974892c4462e;hp=8529e1926b31ebb88548653594ec867af202b6ea;hb=d3e310b940b60f6590f0e94a576aece95a055942;hpb=224aa4f574c63576961dc9dc37e075e2e5096a5a diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 8529e1926b..41278799ed 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -23,13 +25,13 @@ import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.After; -import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -40,31 +42,30 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; -import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; -import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; -import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.yangtools.concepts.Identifier; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractLeaderTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; public static final String LEADER_ID = "leader"; @@ -94,10 +95,8 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(createActorContext()); - // handle message should return the Leader state when it receives an - // unknown message - RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo"); - Assert.assertTrue(behavior instanceof Leader); + // handle message should null when it receives an unknown message + assertNull(leader.handleMessage(followerActor, "foo")); } @Test @@ -105,6 +104,7 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setCommitIndex(-1); short payloadVersion = (short)5; actorContext.setPayloadVersion(payloadVersion); @@ -112,6 +112,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(term, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader should send an immediate heartbeat with no entries as follower is inactive. long lastIndex = actorContext.getReplicatedLog().lastIndex(); @@ -133,7 +134,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams(). getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex()); @@ -199,6 +200,7 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setCommitIndex(-1); // The raft context is initialized with a couple log entries. However the commitIndex // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't @@ -208,6 +210,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(newTerm, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader will send an immediate heartbeat - ignore it. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -407,7 +410,7 @@ public class LeaderTest extends AbstractLeaderTest { // Wait slightly longer than heartbeat duration Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); @@ -449,7 +452,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i=0;i<3;i++) { Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); } List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); @@ -485,7 +488,7 @@ public class LeaderTest extends AbstractLeaderTest { followerActor.underlyingActor().clear(); Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); sendReplicate(actorContext, lastIndex+1); List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); @@ -513,8 +516,8 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(leaderActor, "state-id", newEntry)); + final Identifier id = new MockIdentifier("state-id"); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -535,7 +538,7 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState last = applyStateList.get((int) newLogIndex - 1); assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData()); - assertEquals("getIdentifier", "state-id", last.getIdentifier()); + assertEquals("getIdentifier", id, last.getIdentifier()); } @Test @@ -581,7 +584,8 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm)); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); leader.setFollowerSnapshot(FOLLOWER_ID, fts); //send first chunk and no InstallSnapshotReply received yet @@ -591,18 +595,16 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); - AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - - AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto); + AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty()); //InstallSnapshotReply received fts.markSendStatus(true); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -651,7 +653,7 @@ public class LeaderTest extends AbstractLeaderTest { // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, "state-id", entry)); + leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); assertTrue(raftBehavior instanceof Leader); @@ -696,7 +698,7 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -709,9 +711,9 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); - Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @Test @@ -770,10 +772,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); - // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + // if an initiate is started again when first is in progress, it should not initiate Capture + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); - Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @@ -894,6 +896,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -915,7 +918,8 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm)); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); leader.setFollowerSnapshot(FOLLOWER_ID, fts); while(!fts.isLastChunk(fts.getChunkIndex())) { fts.getNextChunk(); @@ -962,6 +966,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -1010,7 +1015,7 @@ public class LeaderTest extends AbstractLeaderTest { installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class); - Assert.assertNull(installSnapshot); + assertNull(installSnapshot); } @@ -1070,7 +1075,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1124,9 +1129,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue()); + assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE, + installSnapshot.getLastChunkHashCode().get().intValue()); - int hashCode = installSnapshot.getData().hashCode(); + int hashCode = Arrays.hashCode(installSnapshot.getData()); followerActor.underlyingActor().clear(); @@ -1163,7 +1169,8 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); byte[] barray = bs.toByteArray(); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); leader.setFollowerSnapshot(FOLLOWER_ID, fts); assertEquals(bs.size(), barray.length); @@ -1177,8 +1184,8 @@ public class LeaderTest extends AbstractLeaderTest { j = barray.length; } - ByteString chunk = fts.getNextChunk(); - assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size()); + byte[] chunk = fts.getNextChunk(); + assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length); assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex()); fts.markSendStatus(true); @@ -1190,8 +1197,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); } - @Override protected RaftActorBehavior createBehavior( - RaftActorContext actorContext) { + @Override + protected Leader createBehavior(final RaftActorContext actorContext) { return new Leader(actorContext); } @@ -1241,6 +1248,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1264,6 +1272,7 @@ public class LeaderTest extends AbstractLeaderTest { followerActorContext.setCommitIndex(1); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -1295,6 +1304,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map leaderPeerAddresses = new HashMap<>(); leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1340,7 +1350,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -1406,7 +1416,7 @@ public class LeaderTest extends AbstractLeaderTest { appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); assertEquals("Log entries size", 2, appendEntries.getEntries().size()); assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex()); @@ -1462,6 +1472,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1477,6 +1488,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1541,6 +1553,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1556,6 +1569,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1738,6 +1752,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1753,6 +1768,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1773,7 +1789,7 @@ public class LeaderTest extends AbstractLeaderTest { appendEntries = appendEntriesList.get(1); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex()); assertEquals("Log entries size", 2, appendEntries.getEntries().size()); assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex()); @@ -1818,8 +1834,27 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContext(); leader = new Leader(leaderActorContext); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue(behavior instanceof Leader); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue(behavior instanceof Leader); + } + + @Test + public void testIsolatedLeaderCheckNoVotingFollowers() { + logStart("testIsolatedLeaderCheckNoVotingFollowers"); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING); + + leader = new Leader(leaderActorContext); + leader.getFollower(FOLLOWER_ID).markFollowerActive(); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Expected Leader", behavior instanceof Leader); } private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){ @@ -1839,9 +1874,8 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive("follower-1"); leader.markFollowerActive("follower-2"); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when all followers are active", - behavior instanceof Leader); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader); // kill 1 follower and verify if that got killed final JavaTestKit probe = new JavaTestKit(getSystem()); @@ -1852,9 +1886,8 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerInActive("follower-1"); leader.markFollowerActive("follower-2"); - behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", - behavior instanceof Leader); + behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader); // kill 2nd follower and leader should change to Isolated leader followerActor2.tell(PoisonPill.getInstance(), null); @@ -1864,7 +1897,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(termMsg2.getActor(), followerActor2); leader.markFollowerInActive("follower-2"); - return leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); } @Test @@ -1873,7 +1906,7 @@ public class LeaderTest extends AbstractLeaderTest { RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); - Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", + assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", behavior instanceof IsolatedLeader); } @@ -1883,7 +1916,7 @@ public class LeaderTest extends AbstractLeaderTest { RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); - Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled", + assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled", behavior instanceof Leader); } @@ -1956,6 +1989,7 @@ public class LeaderTest extends AbstractLeaderTest { new FiniteDuration(1000, TimeUnit.SECONDS)); leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); String nonVotingFollowerId = "nonvoting-follower"; TestActorRef nonVotingFollowerActor = actorFactory.createTestActor( @@ -1964,6 +1998,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Ignore initial heartbeats MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2020,6 +2055,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2044,7 +2080,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); // Leader should force an election timeout - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); verify(mockTransferCohort).transferComplete(); } @@ -2059,6 +2095,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2076,7 +2113,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // Leader should force an election timeout - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); verify(mockTransferCohort).transferComplete(); } @@ -2090,6 +2127,7 @@ public class LeaderTest extends AbstractLeaderTest { new FiniteDuration(200, TimeUnit.MILLISECONDS)); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2107,12 +2145,12 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); // Leader should force an election timeout - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); verify(mockTransferCohort).transferComplete(); } @@ -2128,6 +2166,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2148,7 +2187,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) { Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); } verify(mockTransferCohort).abortTransfer(); @@ -2157,7 +2196,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());