Bug 8206: Fix IOException from initiateCaptureSnapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / LeadershipTransferIntegrationTest.java
index 76ef5f6228a37509a067f67c0e5858ca8d5191c2..7ed45956c17f77f16cc6f60493de4097680d31c1 100644 (file)
@@ -8,22 +8,29 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.pattern.Patterns;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -43,6 +50,7 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
     private TestActorRef<MessageCollectorActor> follower3NotifierActor;
     private TestActorRef<TestRaftActor> follower3Actor;
     private ActorRef follower3CollectorActor;
+    private ActorRef requestLeadershipResultCollectorActor;
 
     @Test
     public void testLeaderTransferOnShutDown() throws Exception {
@@ -79,23 +87,37 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
         clearMessages(follower2NotifierActor);
         clearMessages(follower3NotifierActor);
 
+        // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
+        final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
+        follower2Instance.startDropMessages(LeaderTransitioning.class);
+
         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
         final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
 
-        assertNullLeaderIdChange(leaderNotifierActor);
-        assertNullLeaderIdChange(follower1NotifierActor);
-        assertNullLeaderIdChange(follower2NotifierActor);
-        assertNullLeaderIdChange(follower3NotifierActor);
-
         verifyRaftState(follower1Actor, RaftState.Leader);
 
         Boolean stopped = Await.result(stopFuture, duration);
         assertEquals("Stopped", Boolean.TRUE, stopped);
 
-        follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+        // Re-enable LeaderTransitioning messages to follower2.
+        final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
+                LeaderTransitioning.class);
+        follower2Instance.stopDropMessages(LeaderTransitioning.class);
+
+        follower2Instance.stopDropMessages(AppendEntries.class);
         ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
         assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
 
+        // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
+        follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
+
+        verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
+        verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
+        // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
+        // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
+        verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
+        verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
+
         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
     }
 
@@ -166,9 +188,16 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
         verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
     }
 
-    private static void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
-        LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class);
-        assertNull("Expected null leader Id", change.getLeaderId());
+    private void verifyLeaderStateChangedMessages(TestActorRef<MessageCollectorActor> notifierActor,
+            String... expLeaderIds) {
+        List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
+                expLeaderIds.length);
+
+        Collections.reverse(leaderStateChanges);
+        Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
+        for (int i = expLeaderIds.length - 1; i >= 0; i--) {
+            assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
+        }
     }
 
     @Test
@@ -199,4 +228,78 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
 
         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
     }
+
+    private void sendFollower2RequestLeadershipTransferToLeader() {
+        testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
+
+        leaderActor.tell(
+                new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
+
+        testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
+    }
+
+    private void createRequestLeadershipResultCollectorActor() {
+        testLog.info("createRequestLeadershipResultCollectorActor starting");
+
+        requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
+
+        testLog.info("createRequestLeadershipResultCollectorActor ending");
+    }
+
+    @Test
+    public void testSuccessfulRequestLeadershipTransferToFollower2() {
+        testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        verifyRaftState(follower2Actor, RaftState.Leader);
+
+        expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
+
+        testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
+    }
+
+    @Test
+    public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        sendPayloadWithFollower2Lagging();
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        verifyRaftState(follower1Actor, RaftState.Follower);
+        verifyRaftState(follower2Actor, RaftState.Follower);
+        verifyRaftState(follower3Actor, RaftState.Follower);
+
+        Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+        assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
+    }
+
+    @Test
+    public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        sendShutDown(follower2Actor);
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        verifyRaftState(follower1Actor, RaftState.Follower);
+        verifyRaftState(follower3Actor, RaftState.Follower);
+
+        Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+        assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
+    }
 }