X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FLeadershipTransferIntegrationTest.java;h=e99215ddbaa8cee74d457b2ef1f55e098653a8f4;hp=e1b162fcf1f27d12dda6e19e516d08839998ecb6;hb=2fd1fa721510a30f58b3bc277deb05fce58badd6;hpb=a615daaacf68376104e89e31b6e40f6636bb2a1e diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java index e1b162fcf1..e99215ddba 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java @@ -7,27 +7,35 @@ */ package org.opendaylight.controller.cluster.raft; -import static akka.pattern.Patterns.ask; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; + import akka.actor.ActorRef; -import akka.actor.Props; +import akka.actor.Status; import akka.pattern.Patterns; import akka.testkit.TestActorRef; -import akka.util.Timeout; -import com.google.common.base.Stopwatch; +import akka.testkit.javadsl.TestKit; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; -import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; +import org.opendaylight.controller.cluster.raft.persisted.EmptyState; +import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.Await; import scala.concurrent.Future; @@ -41,15 +49,16 @@ import scala.concurrent.duration.FiniteDuration; public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest { private final String follower3Id = factory.generateActorId("follower"); - private TestActorRef leaderNotifierActor; - private TestActorRef follower1NotifierActor; - private TestActorRef follower2NotifierActor; - private TestActorRef follower3NotifierActor; + private ActorRef leaderNotifierActor; + private ActorRef follower1NotifierActor; + private ActorRef follower2NotifierActor; + private ActorRef follower3NotifierActor; private TestActorRef follower3Actor; private ActorRef follower3CollectorActor; + private ActorRef requestLeadershipResultCollectorActor; @Test - public void testLeaderTransferOnShutDown() throws Throwable { + public void testLeaderTransferOnShutDown() throws Exception { testLog.info("testLeaderTransferOnShutDown starting"); createRaftActors(); @@ -63,11 +72,11 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat testLog.info("testLeaderTransferOnShutDown ending"); } - private void sendShutDown(ActorRef actor) throws Exception { + private void sendShutDown(final ActorRef actor) throws Exception { testLog.info("sendShutDown for {} starting", actor.path()); FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); - Future stopFuture = Patterns.gracefulStop(actor, duration, new Shutdown()); + Future stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE); Boolean stopped = Await.result(stopFuture, duration); assertEquals("Stopped", Boolean.TRUE, stopped); @@ -75,7 +84,7 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat testLog.info("sendShutDown for {} ending", actor.path()); } - private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Throwable { + private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception { testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting"); clearMessages(leaderNotifierActor); @@ -83,23 +92,37 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat clearMessages(follower2NotifierActor); clearMessages(follower3NotifierActor); - FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); - Future stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown()); + // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id. + final TestRaftActor follower2Instance = follower2Actor.underlyingActor(); + follower2Instance.startDropMessages(LeaderTransitioning.class); - assertNullLeaderIdChange(leaderNotifierActor); - assertNullLeaderIdChange(follower1NotifierActor); - assertNullLeaderIdChange(follower2NotifierActor); - assertNullLeaderIdChange(follower3NotifierActor); + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + final Future stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE); verifyRaftState(follower1Actor, RaftState.Leader); Boolean stopped = Await.result(stopFuture, duration); assertEquals("Stopped", Boolean.TRUE, stopped); - follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); + // Re-enable LeaderTransitioning messages to follower2. + final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor, + LeaderTransitioning.class); + follower2Instance.stopDropMessages(LeaderTransitioning.class); + + follower2Instance.stopDropMessages(AppendEntries.class); ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class); assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex()); + // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader. + follower2Actor.tell(leaderTransitioning, ActorRef.noSender()); + + verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id); + verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id); + // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message + // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set. + verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id); + verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id); + testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending"); } @@ -120,38 +143,48 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat private void createRaftActors() { testLog.info("createRaftActors starting"); - follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class), + final Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1, + 1, null, new org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload( + Arrays.asList(new ServerInfo(leaderId, true), new ServerInfo(follower1Id, true), + new ServerInfo(follower2Id, true), new ServerInfo(follower3Id, false)))); + + InMemorySnapshotStore.addSnapshot(leaderId, snapshot); + InMemorySnapshotStore.addSnapshot(follower1Id, snapshot); + InMemorySnapshotStore.addSnapshot(follower2Id, snapshot); + InMemorySnapshotStore.addSnapshot(follower3Id, snapshot); + + follower1NotifierActor = factory.createActor(MessageCollectorActor.props(), factory.generateActorId(follower1Id + "-notifier")); follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses( ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id), - follower3Id, testActorPath(follower3Id))). - config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor)); + follower3Id, testActorPath(follower3Id))) + .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor)); - follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class), + follower2NotifierActor = factory.createActor(MessageCollectorActor.props(), factory.generateActorId(follower2Id + "-notifier")); follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses( ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(), - follower3Id, testActorPath(follower3Id))). - config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor)); + follower3Id, testActorPath(follower3Id))) + .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor)); - follower3NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class), + follower3NotifierActor = factory.createActor(MessageCollectorActor.props(), factory.generateActorId(follower3Id + "-notifier")); follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses( ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(), - follower2Id, follower2Actor.path().toString())). - config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor)); + follower2Id, follower2Actor.path().toString())) + .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor)); - peerAddresses = ImmutableMap.builder(). - put(follower1Id, follower1Actor.path().toString()). - put(follower2Id, follower2Actor.path().toString()). - put(follower3Id, follower3Actor.path().toString()).build(); + peerAddresses = ImmutableMap.builder() + .put(follower1Id, follower1Actor.path().toString()) + .put(follower2Id, follower2Actor.path().toString()) + .put(follower3Id, follower3Actor.path().toString()).build(); leaderConfigParams = newLeaderConfigParams(); leaderConfigParams.setElectionTimeoutFactor(3); - leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class), + leaderNotifierActor = factory.createActor(MessageCollectorActor.props(), factory.generateActorId(leaderId + "-notifier")); - leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses). - config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor)); + leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses) + .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor)); follower1CollectorActor = follower1Actor.underlyingActor().collectorActor(); follower2CollectorActor = follower2Actor.underlyingActor().collectorActor(); @@ -159,45 +192,35 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat leaderCollectorActor = leaderActor.underlyingActor().collectorActor(); leaderContext = leaderActor.underlyingActor().getRaftActorContext(); - leaderContext.getPeerInfo(follower3Id).setVotingState(VotingState.NON_VOTING); waitUntilLeader(leaderActor); testLog.info("createRaftActors starting"); } - private void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable { - Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS); - Throwable lastError = null; - Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 5) { - try { - OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor, - GetOnDemandRaftState.INSTANCE, timeout), timeout.duration()); - assertEquals("getRaftState", expState.toString(), raftState.getRaftState()); - return; - } catch (Exception | AssertionError e) { - lastError = e; - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - } - } - - throw lastError; + private static void verifyRaftState(final ActorRef raftActor, final RaftState expState) { + verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState())); } - private void assertNullLeaderIdChange(TestActorRef notifierActor) { - LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class); - assertNull("Expected null leader Id", change.getLeaderId()); + private static void verifyLeaderStateChangedMessages(final ActorRef notifierActor, + final String... expLeaderIds) { + List leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class, + expLeaderIds.length); + + Collections.reverse(leaderStateChanges); + Iterator actual = leaderStateChanges.iterator(); + for (int i = expLeaderIds.length - 1; i >= 0; i--) { + assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId()); + } } @Test - public void testLeaderTransferAborted() throws Throwable { + public void testLeaderTransferAborted() throws Exception { testLog.info("testLeaderTransferAborted starting"); createRaftActors(); - follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); - follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); + leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class); sendShutDown(leaderActor); @@ -209,7 +232,7 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat } @Test - public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Throwable { + public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception { testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting"); leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams())); @@ -219,4 +242,99 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending"); } + + private void sendFollower2RequestLeadershipTransferToLeader() { + testLog.info("sendFollower2RequestLeadershipTransferToLeader starting"); + + leaderActor.tell( + new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender()); + + testLog.info("sendFollower2RequestLeadershipTransferToLeader ending"); + } + + private void createRequestLeadershipResultCollectorActor() { + testLog.info("createRequestLeadershipResultCollectorActor starting"); + + requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props()); + + testLog.info("createRequestLeadershipResultCollectorActor ending"); + } + + @Test + public void testSuccessfulRequestLeadershipTransferToFollower2() { + testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting"); + + createRaftActors(); + createRequestLeadershipResultCollectorActor(); + + sendFollower2RequestLeadershipTransferToLeader(); + + verifyRaftState(follower2Actor, RaftState.Leader); + + expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1); + + testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending"); + } + + @Test + public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() { + testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting"); + + createRaftActors(); + createRequestLeadershipResultCollectorActor(); + + sendPayloadWithFollower2Lagging(); + + sendFollower2RequestLeadershipTransferToLeader(); + + verifyRaftState(follower1Actor, RaftState.Follower); + verifyRaftState(follower2Actor, RaftState.Follower); + verifyRaftState(follower3Actor, RaftState.Follower); + + Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class); + assertTrue(failure.cause() instanceof LeadershipTransferFailedException); + + testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending"); + } + + + @Test + public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception { + testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting"); + + createRaftActors(); + createRequestLeadershipResultCollectorActor(); + + sendShutDown(follower2Actor); + + sendFollower2RequestLeadershipTransferToLeader(); + + verifyRaftState(follower1Actor, RaftState.Follower); + verifyRaftState(follower3Actor, RaftState.Follower); + + Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class); + assertTrue(failure.cause() instanceof LeadershipTransferFailedException); + + testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending"); + } + + @Test + public void testRequestLeadershipTransferToFollower2WithOtherFollowersDown() { + testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown starting"); + + createRaftActors(); + createRequestLeadershipResultCollectorActor(); + + factory.killActor(follower1Actor, new TestKit(getSystem())); + factory.killActor(follower3Actor, new TestKit(getSystem())); + + sendFollower2RequestLeadershipTransferToLeader(); + + expectFirstMatching(requestLeadershipResultCollectorActor, Status.Success.class); + + verifyRaftState(follower2Actor, RaftState.Leader); + verifyRaftState(leaderActor, RaftState.Follower); + + testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown ending"); + } }