Add ShutDown message to RaftActor to transfer leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / LeadershipTransferIntegrationTest.java
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java
new file mode 100644 (file)
index 0000000..3cc330a
--- /dev/null
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static akka.pattern.Patterns.ask;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests leadership transfer end-to-end.
+ *
+ * @author Thomas Pantelis
+ */
+public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+    private TestActorRef<MessageCollectorActor> leaderNotifierActor;
+    private TestActorRef<MessageCollectorActor> follower1NotifierActor;
+    private TestActorRef<MessageCollectorActor> follower2NotifierActor;
+
+    @Test
+    public void testLeaderTransferOnShutDown() throws Throwable {
+        testLog.info("testLeaderTransferOnShutDown starting");
+
+        createRaftActors();
+
+        sendPayloadWithFollower2Lagging();
+
+        sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
+
+        sendShutDown(follower2Actor);
+
+        testLog.info("testLeaderTransferOnShutDown ending");
+    }
+
+    private void sendShutDown(ActorRef actor) throws Exception {
+        testLog.info("sendShutDown for {} starting", actor.path());
+
+        FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+        Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, new Shutdown());
+
+        Boolean stopped = Await.result(stopFuture, duration);
+        assertEquals("Stopped", Boolean.TRUE, stopped);
+
+        testLog.info("sendShutDown for {} ending", actor.path());
+    }
+
+    private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Throwable {
+        testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
+
+        clearMessages(leaderNotifierActor);
+        clearMessages(follower1NotifierActor);
+        clearMessages(follower2NotifierActor);
+
+        FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+        Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown());
+
+        assertNullLeaderIdChange(leaderNotifierActor);
+        assertNullLeaderIdChange(follower1NotifierActor);
+        assertNullLeaderIdChange(follower2NotifierActor);
+
+        verifyRaftState(follower1Actor, RaftState.Leader);
+
+        Boolean stopped = Await.result(stopFuture, duration);
+        assertEquals("Stopped", Boolean.TRUE, stopped);
+
+        follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+        ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
+        assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
+
+        testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
+    }
+
+    private void sendPayloadWithFollower2Lagging() {
+        testLog.info("sendPayloadWithFollower2Lagging starting");
+
+        follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        sendPayloadData(leaderActor, "zero");
+
+        expectFirstMatching(leaderCollectorActor, ApplyState.class);
+        expectFirstMatching(follower1CollectorActor, ApplyState.class);
+
+        testLog.info("sendPayloadWithFollower2Lagging ending");
+    }
+
+    private void createRaftActors() {
+        testLog.info("createRaftActors starting");
+
+        follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(follower1Id + "-notifier"));
+        follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
+                    config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
+
+        follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(follower2Id + "-notifier"));
+        follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())).
+                    config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).
+                put(follower2Id, follower2Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderConfigParams.setElectionTimeoutFactor(3);
+        leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(leaderId + "-notifier"));
+        leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
+                config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
+        waitUntilLeader(leaderActor);
+
+        testLog.info("createRaftActors starting");
+    }
+
+    private void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable {
+        Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
+        Throwable lastError = null;
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            try {
+                OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+                        GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+                assertEquals("getRaftState", expState.toString(), raftState.getRaftState());
+                return;
+            } catch (Exception | AssertionError e) {
+                lastError = e;
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        throw lastError;
+    }
+
+    private void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
+        LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class);
+        assertNull("Expected null leader Id", change.getLeaderId());
+    }
+
+    @Test
+    public void testLeaderTransferAborted() throws Throwable {
+        testLog.info("testLeaderTransferAborted starting");
+
+        createRaftActors();
+
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+        follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        sendShutDown(leaderActor);
+
+        testLog.info("testLeaderTransferOnShutDown ending");
+    }
+
+    @Test
+    public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Throwable {
+        testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
+
+        leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
+        waitUntilLeader(leaderActor);
+
+        sendShutDown(leaderActor);
+
+        testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
+    }
+}