Bug 2187: Return OK reply after AddServer persist
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
index 3d6f7f414ce7b1f1fc03e876fb7b35912656f23a..3e04e4c1bb396000f3010b507cea6aecc33610c1 100644 (file)
@@ -8,8 +8,10 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
@@ -133,12 +135,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         clearMessages(followerActor);
 
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
         // Leader should install snapshot - capture and verify ApplySnapshot contents
 
         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+        @SuppressWarnings("unchecked")
         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
 
@@ -148,6 +152,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify ServerConfigurationPayload entry in leader's log
 
+        expectFirstMatching(leaderCollectorActor, ApplyState.class);
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
@@ -157,10 +162,12 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify ServerConfigurationPayload entry in both followers
 
+        expectFirstMatching(followerActor, ApplyState.class);
         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
 
+        expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
@@ -211,11 +218,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
 
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
         // Leader should install snapshot - capture and verify ApplySnapshot contents
 
         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+        @SuppressWarnings("unchecked")
         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
 
@@ -225,6 +235,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify ServerConfigurationPayload entry in leader's log
 
+        expectFirstMatching(leaderCollectorActor, ApplyState.class);
         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
@@ -246,9 +257,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     @Test
     public void testAddServersAsNonVoting() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
-        initialActorContext.setCommitIndex(-1);
-        initialActorContext.setLastApplied(-1);
-        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
@@ -258,6 +266,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
 
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
 
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
@@ -266,6 +276,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify ServerConfigurationPayload entry in leader's log
 
+        expectFirstMatching(leaderCollectorActor, ApplyState.class);
+
         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
@@ -283,10 +295,12 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
 
-        MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
+        assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
 
         // Add another non-voting server.
 
+        clearMessages(leaderCollectorActor);
+
         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
         Follower newFollower2 = new Follower(follower2ActorContext);
         followerActor.underlyingActor().setBehavior(newFollower2);
@@ -297,6 +311,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
 
+        expectFirstMatching(leaderCollectorActor, ApplyState.class);
         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
@@ -307,9 +322,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     @Test
     public void testAddServerWithOperationInProgress() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
-        initialActorContext.setCommitIndex(-1);
-        initialActorContext.setLastApplied(-1);
-        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
@@ -319,6 +331,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
 
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+
         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
         Follower newFollower2 = new Follower(follower2ActorContext);
         followerActor.underlyingActor().setBehavior(newFollower2);
@@ -349,6 +363,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify ServerConfigurationPayload entries in leader's log
 
+        expectMatching(leaderCollectorActor, ApplyState.class, 2);
         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
@@ -357,8 +372,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify ServerConfigurationPayload entry in the new follower
 
-        MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
-
+        expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
                newFollowerActorContext.getPeerIds());
     }
@@ -366,9 +380,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     @Test
     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
-        initialActorContext.setCommitIndex(-1);
-        initialActorContext.setLastApplied(-1);
-        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
@@ -378,10 +389,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
 
-        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
-                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                actorFactory.generateActorId(LEADER_ID + "Collector"));
-        leaderRaftActor.setCollectorActor(leaderCollectorActor);
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
 
         // Drop commit message for now to delay snapshot completion
         leaderRaftActor.setDropMessageOfType(String.class);
@@ -403,6 +411,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify ServerConfigurationPayload entry in leader's log
 
+        expectFirstMatching(leaderCollectorActor, ApplyState.class);
         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
@@ -413,9 +422,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     @Test
     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
-        initialActorContext.setCommitIndex(-1);
-        initialActorContext.setLastApplied(-1);
-        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
@@ -427,11 +433,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
 
-        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
-                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                actorFactory.generateActorId(LEADER_ID + "Collector"));
-        leaderRaftActor.setCollectorActor(leaderCollectorActor);
-
         // Drop commit message so the snapshot doesn't complete.
         leaderRaftActor.setDropMessageOfType(String.class);
 
@@ -448,9 +449,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     @Test
     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
-        initialActorContext.setCommitIndex(-1);
-        initialActorContext.setLastApplied(-1);
-        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
@@ -461,10 +459,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
 
-        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
-                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                actorFactory.generateActorId(LEADER_ID + "Collector"));
-        leaderRaftActor.setCollectorActor(leaderCollectorActor);
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
 
         // Drop the commit message so the snapshot doesn't complete yet.
         leaderRaftActor.setDropMessageOfType(String.class);
@@ -498,9 +493,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     @Test
     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
-        initialActorContext.setCommitIndex(-1);
-        initialActorContext.setLastApplied(-1);
-        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
@@ -512,10 +504,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
 
-        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
-                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                actorFactory.generateActorId(LEADER_ID + "Collector"));
-        leaderRaftActor.setCollectorActor(leaderCollectorActor);
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
 
         // Drop the UnInitializedFollowerSnapshotReply to delay it.
         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
@@ -544,9 +533,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     @Test
     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
-        initialActorContext.setCommitIndex(-1);
-        initialActorContext.setLastApplied(-1);
-        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
@@ -588,6 +574,32 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
     }
 
+    @Test
+    public void testAddServerWithNoConsensusReached() {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+        newFollowerRaftActor.underlyingActor().setDropMessageOfType(AppendEntries.class);
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        // Verify ServerConfigurationPayload entry in leader's log
+
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+                votingServer(NEW_SERVER_ID));
+    }
+
     @Test
     public void testAddServerForwardedToLeader() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
@@ -610,6 +622,21 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         expectFirstMatching(leaderActor, AddServer.class);
     }
 
+    @Test
+    public void testOnApplyState() {
+        RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(new MockRaftActorContext());
+
+        ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+                new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
+        boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), null, ActorRef.noSender());
+        assertEquals("Message handled", true, handled);
+
+        ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+                new MockRaftActorContext.MockPayload("1"));
+        handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), null, ActorRef.noSender());
+        assertEquals("Message handled", false, handled);
+    }
+
     private ServerInfo votingServer(String id) {
         return new ServerInfo(id, true);
     }
@@ -618,6 +645,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         return new ServerInfo(id, false);
     }
 
+    private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID + "Collector"));
+        leaderRaftActor.setCollectorActor(leaderCollectorActor);
+        return leaderCollectorActor;
+    }
+
     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());