X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=04ae7ffe8ce77f4e6634f1a672013d79b9d01764;hb=refs%2Fchanges%2F22%2F65622%2F11;hp=c3d33e12ec41ae744079bf864a4ab7d769f632f8;hpb=8049fd4d06da0f4616180e46fbbe95f98cf698ea;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index c3d33e12ec..04ae7ffe8c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -23,8 +23,8 @@ import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; -import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; @@ -39,8 +39,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Test; +import org.opendaylight.controller.cluster.messaging.MessageSlice; +import org.opendaylight.controller.cluster.messaging.MessageSliceReply; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; @@ -70,6 +73,7 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.yangtools.concepts.Identifier; @@ -91,7 +95,7 @@ public class LeaderTest extends AbstractLeaderTest { @Override @After - public void tearDown() throws Exception { + public void tearDown() { if (leader != null) { leader.close(); } @@ -155,12 +159,15 @@ public class LeaderTest extends AbstractLeaderTest { } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) { + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) { return sendReplicate(actorContext, 1, index); } private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) { - MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); + return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo")); + } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) { SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); actorContext.getReplicatedLog().append(newEntry); return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); @@ -585,7 +592,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive(FOLLOWER_ID); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( @@ -692,7 +699,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(null); + leader.setSnapshotHolder(null); // new entry SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, @@ -741,7 +748,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().removeFrom(0); AtomicReference> installSnapshotStream = new AtomicReference<>(); - actorContext.setCreateSnapshotProcedure(out -> installSnapshotStream.set(out)); + actorContext.setCreateSnapshotProcedure(installSnapshotStream::set); leader = new Leader(actorContext); actorContext.setCurrentBehavior(leader); @@ -750,7 +757,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(null); + leader.setSnapshotHolder(null); for (int i = 0; i < 4; i++) { actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1, @@ -946,7 +953,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( @@ -1233,11 +1240,11 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - protected MockRaftActorContext createActorContext(ActorRef actorRef) { + protected MockRaftActorContext createActorContext(final ActorRef actorRef) { return createActorContext(LEADER_ID, actorRef); } - private MockRaftActorContext createActorContext(String id, ActorRef actorRef) { + private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); @@ -1888,7 +1895,7 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue("Expected Leader", newBehavior instanceof Leader); } - private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) { + private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) { ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1"); ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2"); @@ -1909,7 +1916,7 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader); // kill 1 follower and verify if that got killed - final JavaTestKit probe = new JavaTestKit(getSystem()); + final TestKit probe = new TestKit(getSystem()); probe.watch(followerActor1); followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); @@ -2022,8 +2029,8 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setLastApplied(-1); String nonVotingFollowerId = "nonvoting-follower"; - TestActorRef nonVotingFollowerActor = actorFactory.createTestActor( - Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId)); + ActorRef nonVotingFollowerActor = actorFactory.createActor( + MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId)); leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); @@ -2230,9 +2237,148 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); } + @Test + public void testReplicationWithPayloadSizeThatExceedsThreshold() { + logStart("testReplicationWithPayloadSizeThatExceedsThreshold"); + + final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1, + Arrays.asList(new SimpleReplicatedLogEntry(0, 1, + new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length; + final MockRaftActorContext.MockPayload largePayload = + new MockRaftActorContext.MockPayload("large", serializedSize); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(300, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Send initial heartbeat reply so follower is marked active + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + // Send normal payload first to prime commit index. + final long term = leaderActorContext.getTermInformation().getCurrentTerm(); + sendReplicate(leaderActorContext, term, 0); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex()); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0)); + assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex()); + MessageCollectorActor.clearMessages(followerActor); + + // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced. + sendReplicate(leaderActorContext, term, 1, largePayload); + + MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex()); + assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices()); + + final Identifier slicingId = messageSlice.getIdentifier(); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + MessageCollectorActor.clearMessages(followerActor); + + // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress. + + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + MessageCollectorActor.clearMessages(followerActor); + + // Simulate the MessageSliceReply's and AppendEntriesReply from the follower. + + leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor)); + messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex()); + + leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor)); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0)); + + MessageCollectorActor.clearMessages(followerActor); + + // Send another normal payload. + + sendReplicate(leaderActorContext, term, 2); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit()); + } + + @Test + public void testLargePayloadSlicingExpiration() { + logStart("testLargePayloadSlicingExpiration"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + final long term = leaderActorContext.getTermInformation().getCurrentTerm(); + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Send initial heartbeat reply so follower is marked active + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large", + leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1)); + MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + + // Sleep for at least 3 * election timeout so the slicing state expires. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS); + MessageCollectorActor.clearMessages(followerActor); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + + MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300); + MessageCollectorActor.clearMessages(followerActor); + + // Send an AppendEntriesReply - this should restart the slicing. + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0)); + + MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + } + @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, - ActorRef actorRef, RaftRPC rpc) throws Exception { + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext, + final ActorRef actorRef, final RaftRPC rpc) { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); } @@ -2242,8 +2388,7 @@ public class LeaderTest extends AbstractLeaderTest { private final long electionTimeOutIntervalMillis; private final int snapshotChunkSize; - MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) { - super(); + MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) { this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis; this.snapshotChunkSize = snapshotChunkSize; }