Add getPeerIds to RaftActorContext
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
index 5c9ba13b8beda3e439df5dce505f2d96c747e1be..1b717f147d25819b2de5430f7f98414770fa2464 100644 (file)
@@ -59,6 +59,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     static final String LEADER_ID = "leader";
     static final String FOLLOWER_ID = "follower";
     static final String NEW_SERVER_ID = "new-server";
+    static final String NEW_SERVER_ID2 = "new-server2";
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
 
@@ -70,8 +71,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
-
     private RaftActorContext newFollowerActorContext;
+
     private final JavaTestKit testKit = new JavaTestKit(getSystem());
 
     @Before
@@ -79,10 +80,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
 
-        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-        configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
-        configParams.setElectionTimeoutFactor(100000);
-        configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+        DefaultConfigParamsImpl configParams = newFollowerConfigParams();
 
         newFollowerCollectorActor = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -90,7 +88,20 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(NEW_SERVER_ID));
-        newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
+
+        try {
+            newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
+        } catch (Exception e) {
+            newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
+        }
+    }
+
+    private DefaultConfigParamsImpl newFollowerConfigParams() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(100000);
+        configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+        return configParams;
     }
 
     @After
@@ -150,14 +161,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify new server config was applied in both followers
 
-        assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID),
-                followerActorContext.getPeerAddresses().keySet());
+        assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
 
-        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
-                newFollowerActorContext.getPeerAddresses().keySet());
-
-        clearMessages(followerActor);
-        clearMessages(newFollowerCollectorActor);
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
 
         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
         expectFirstMatching(followerActor, ApplyState.class);
@@ -205,20 +211,17 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify ServerConfigurationPayload entry in the new follower
 
-        clearMessages(newFollowerCollectorActor);
-
         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
 
         // Verify new server config was applied in the new follower
 
-        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
-                newFollowerActorContext.getPeerAddresses().keySet());
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
     }
 
     @Test
-    public void testAddServerAsNonVoting() throws Exception {
+    public void testAddServersAsNonVoting() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
         initialActorContext.setCommitIndex(-1);
         initialActorContext.setLastApplied(-1);
@@ -253,10 +256,83 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify new server config was applied in the new follower
 
-        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
-                newFollowerActorContext.getPeerAddresses().keySet());
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
+
+        MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
+
+        // Add another non-voting server.
+
+        RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
+        Follower newFollower2 = new Follower(follower2ActorContext);
+        followerActor.underlyingActor().setBehavior(newFollower2);
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
+
+        addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
+                LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2);
+    }
+
+    @Test
+    public void testAddServerWithOperationInProgress() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+        RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
+        Follower newFollower2 = new Follower(follower2ActorContext);
+        followerActor.underlyingActor().setBehavior(newFollower2);
+
+        MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
+        newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        // Wait for leader's install snapshot and capture it
+
+        Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
+
+        JavaTestKit testKit2 = new JavaTestKit(getSystem());
+        leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
+
+        newFollowerRaftActorInstance.setDropMessageOfType(null);
+        newFollowerRaftActor.tell(installSnapshot, leaderActor);
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+
+        addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+
+        // Verify ServerConfigurationPayload entries in leader's log
+
+        assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
+                LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2);
+
+        // Verify ServerConfigurationPayload entry in the new follower
+
+        MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
 
-        MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500);
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
+               newFollowerActorContext.getPeerIds());
     }
 
     @Test
@@ -275,13 +351,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
 
-        assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size());
+        assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
         assertEquals("Leader followers size", 0,
                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
     }
@@ -340,6 +417,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 id, termInfo, -1, -1,
                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         return followerActorContext;
     }
@@ -380,7 +460,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
-            configParams.setElectionTimeoutFactor(1);
+            configParams.setElectionTimeoutFactor(10);
             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
         }
     }
@@ -400,11 +480,10 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         @Override
         public void handleCommand(Object message) {
-            if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) {
-                return;
+            if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
+                super.handleCommand(message);
             }
 
-            super.handleCommand(message);
             collectorActor.tell(message, getSender());
         }