X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=9d8144c4396fc195541706306debb21bf8060296;hb=24ace09aacc620fd9768e0a7004e802f9385bcfc;hp=8529e1926b31ebb88548653594ec867af202b6ea;hpb=224aa4f574c63576961dc9dc37e075e2e5096a5a;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 8529e1926b..9d8144c439 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -23,6 +23,7 @@ import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -47,7 +48,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntrie import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; -import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -64,7 +64,7 @@ import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorAc import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractLeaderTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; public static final String LEADER_ID = "leader"; @@ -112,6 +112,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(term, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader should send an immediate heartbeat with no entries as follower is inactive. long lastIndex = actorContext.getReplicatedLog().lastIndex(); @@ -133,7 +134,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams(). getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex()); @@ -208,6 +209,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(newTerm, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader will send an immediate heartbeat - ignore it. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -407,7 +409,7 @@ public class LeaderTest extends AbstractLeaderTest { // Wait slightly longer than heartbeat duration Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); @@ -449,7 +451,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i=0;i<3;i++) { Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); } List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); @@ -485,7 +487,7 @@ public class LeaderTest extends AbstractLeaderTest { followerActor.underlyingActor().clear(); Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); sendReplicate(actorContext, lastIndex+1); List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); @@ -591,7 +593,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -602,7 +604,7 @@ public class LeaderTest extends AbstractLeaderTest { //InstallSnapshotReply received fts.markSendStatus(true); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -894,6 +896,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -962,6 +965,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -1070,7 +1074,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1126,7 +1130,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(3, installSnapshot.getTotalChunks()); assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue()); - int hashCode = installSnapshot.getData().hashCode(); + int hashCode = Arrays.hashCode(installSnapshot.getData()); followerActor.underlyingActor().clear(); @@ -1177,8 +1181,8 @@ public class LeaderTest extends AbstractLeaderTest { j = barray.length; } - ByteString chunk = fts.getNextChunk(); - assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size()); + byte[] chunk = fts.getNextChunk(); + assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length); assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex()); fts.markSendStatus(true); @@ -1190,8 +1194,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); } - @Override protected RaftActorBehavior createBehavior( - RaftActorContext actorContext) { + @Override + protected Leader createBehavior(final RaftActorContext actorContext) { return new Leader(actorContext); } @@ -1241,6 +1245,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1264,6 +1269,7 @@ public class LeaderTest extends AbstractLeaderTest { followerActorContext.setCommitIndex(1); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -1295,6 +1301,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map leaderPeerAddresses = new HashMap<>(); leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1340,7 +1347,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -1462,6 +1469,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1477,6 +1485,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1541,6 +1550,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1556,6 +1566,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1738,6 +1749,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1753,6 +1765,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1818,7 +1831,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContext(); leader = new Leader(leaderActorContext); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); Assert.assertTrue(behavior instanceof Leader); } @@ -1839,7 +1852,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive("follower-1"); leader.markFollowerActive("follower-2"); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); Assert.assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader); @@ -1852,7 +1865,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerInActive("follower-1"); leader.markFollowerActive("follower-2"); - behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader); @@ -1864,7 +1877,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(termMsg2.getActor(), followerActor2); leader.markFollowerInActive("follower-2"); - return leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); } @Test @@ -1964,6 +1977,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Ignore initial heartbeats MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2020,6 +2034,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2059,6 +2074,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2090,6 +2106,7 @@ public class LeaderTest extends AbstractLeaderTest { new FiniteDuration(200, TimeUnit.MILLISECONDS)); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2107,7 +2124,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); @@ -2128,6 +2145,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2148,7 +2166,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) { Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); } verify(mockTransferCohort).abortTransfer(); @@ -2157,7 +2175,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());