Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17
18 import akka.actor.AbstractActor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.ImmutableSet;
27 import com.google.common.io.ByteSource;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import java.io.OutputStream;
30 import java.time.Duration;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Optional;
39 import java.util.concurrent.TimeUnit;
40 import org.apache.commons.lang3.SerializationUtils;
41 import org.junit.After;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
48 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
50 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
51 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
52 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
53 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
54 import org.opendaylight.controller.cluster.raft.messages.AddServer;
55 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
60 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
62 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
63 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
64 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
65 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
67 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
69 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
70 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
71 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
72 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
73 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
74 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
75 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79 import scala.concurrent.duration.FiniteDuration;
80
81 /**
82  * Unit tests for RaftActorServerConfigurationSupport.
83  *
84  * @author Thomas Pantelis
85  */
86 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
87     static final String LEADER_ID = "leader";
88     static final String FOLLOWER_ID = "follower";
89     static final String FOLLOWER_ID2 = "follower2";
90     static final String NEW_SERVER_ID = "new-server";
91     static final String NEW_SERVER_ID2 = "new-server2";
92     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
93     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
94     private static final boolean NO_PERSISTENCE = false;
95     private static final boolean PERSISTENT = true;
96
97     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
98
99     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
100             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
101             actorFactory.generateActorId(FOLLOWER_ID));
102
103     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
104     private ActorRef newFollowerCollectorActor;
105     private RaftActorContext newFollowerActorContext;
106
107     private final TestKit testKit = new TestKit(getSystem());
108
109     @Before
110     public void setup() {
111         InMemoryJournal.clear();
112         InMemorySnapshotStore.clear();
113     }
114
115     @SuppressWarnings("checkstyle:IllegalCatch")
116     private void setupNewFollower() {
117         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
118
119         newFollowerCollectorActor = actorFactory.createActor(MessageCollectorActor.props(),
120                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
121         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
122                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
123                 actorFactory.generateActorId(NEW_SERVER_ID));
124
125         try {
126             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
127         } catch (Exception e) {
128             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
129         }
130     }
131
132     private static DefaultConfigParamsImpl newFollowerConfigParams() {
133         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
134         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
135         configParams.setElectionTimeoutFactor(100000);
136         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
137         return configParams;
138     }
139
140     @After
141     public void tearDown() {
142         actorFactory.close();
143     }
144
145     @Test
146     public void testAddServerWithExistingFollower() {
147         LOG.info("testAddServerWithExistingFollower starting");
148         setupNewFollower();
149         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
150         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
151                 0, 3, 1).build());
152         followerActorContext.setCommitIndex(2);
153         followerActorContext.setLastApplied(2);
154
155         Follower follower = new Follower(followerActorContext);
156         followerActor.underlyingActor().setBehavior(follower);
157         followerActorContext.setCurrentBehavior(follower);
158
159         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
160                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
161                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
162                 actorFactory.generateActorId(LEADER_ID));
163
164         // Expect initial heartbeat from the leader.
165         expectFirstMatching(followerActor, AppendEntries.class);
166         clearMessages(followerActor);
167
168         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
169         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
170
171         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
172
173         // Leader should install snapshot - capture and verify ApplySnapshot contents
174
175         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
176         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
177         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
178
179         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
180         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
181         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
182
183         // Verify ServerConfigurationPayload entry in leader's log
184
185         expectFirstMatching(leaderCollectorActor, ApplyState.class);
186         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
187         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
188         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
189         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
190         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
191                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
192
193         // Verify ServerConfigurationPayload entry in both followers
194
195         expectFirstMatching(followerActor, ApplyState.class);
196         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
197         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
198                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
199
200         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
201         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
202         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
203                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
204
205         // Verify new server config was applied in both followers
206
207         assertEquals("Follower peers", ImmutableSet.of(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
208
209         assertEquals("New follower peers", ImmutableSet.of(LEADER_ID, FOLLOWER_ID),
210                 newFollowerActorContext.getPeerIds());
211
212         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
213         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
214         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
215         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
216
217         assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
218                 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
219         assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
220                 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
221
222         assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
223                 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
224         assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
225                 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
226
227         LOG.info("testAddServerWithExistingFollower ending");
228     }
229
230     @Test
231     public void testAddServerWithNoExistingFollower() {
232         LOG.info("testAddServerWithNoExistingFollower starting");
233
234         setupNewFollower();
235         RaftActorContext initialActorContext = new MockRaftActorContext();
236         initialActorContext.setCommitIndex(1);
237         initialActorContext.setLastApplied(1);
238         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
239                 0, 2, 1).build());
240
241         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
242                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
243                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
244                 actorFactory.generateActorId(LEADER_ID));
245
246         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
247         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
248
249         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
250
251         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
252
253         // Leader should install snapshot - capture and verify ApplySnapshot contents
254
255         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
256         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
257         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
258
259         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
260         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
261         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
262
263         // Verify ServerConfigurationPayload entry in leader's log
264
265         expectFirstMatching(leaderCollectorActor, ApplyState.class);
266         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
267         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
268         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
269         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
270                 votingServer(NEW_SERVER_ID));
271
272         // Verify ServerConfigurationPayload entry in the new follower
273
274         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
275         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
276         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
277                 votingServer(NEW_SERVER_ID));
278
279         // Verify new server config was applied in the new follower
280
281         assertEquals("New follower peers", ImmutableSet.of(LEADER_ID), newFollowerActorContext.getPeerIds());
282
283         LOG.info("testAddServerWithNoExistingFollower ending");
284     }
285
286     @Test
287     public void testAddServersAsNonVoting() {
288         LOG.info("testAddServersAsNonVoting starting");
289
290         setupNewFollower();
291         RaftActorContext initialActorContext = new MockRaftActorContext();
292
293         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
294                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
295                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
296                 actorFactory.generateActorId(LEADER_ID));
297
298         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
299         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
300
301         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
302
303         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
304
305         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
306         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
307         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
308
309         // Verify ServerConfigurationPayload entry in leader's log
310
311         expectFirstMatching(leaderCollectorActor, ApplyState.class);
312
313         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
314         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
315         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
316         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
317                 nonVotingServer(NEW_SERVER_ID));
318
319         // Verify ServerConfigurationPayload entry in the new follower
320
321         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
322         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
323         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
324                 nonVotingServer(NEW_SERVER_ID));
325
326         // Verify new server config was applied in the new follower
327
328         assertEquals("New follower peers", ImmutableSet.of(LEADER_ID), newFollowerActorContext.getPeerIds());
329
330         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
331
332         // Add another non-voting server.
333
334         clearMessages(leaderCollectorActor);
335
336         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
337         Follower newFollower2 = new Follower(follower2ActorContext);
338         followerActor.underlyingActor().setBehavior(newFollower2);
339
340         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
341
342         addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
343         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
344         assertEquals("getLeaderHint", Optional.of(LEADER_ID), addServerReply.getLeaderHint());
345
346         expectFirstMatching(leaderCollectorActor, ApplyState.class);
347         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
348         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
349         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
350         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
351                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
352
353         LOG.info("testAddServersAsNonVoting ending");
354     }
355
356     @Test
357     public void testAddServerWithOperationInProgress() {
358         LOG.info("testAddServerWithOperationInProgress starting");
359
360         setupNewFollower();
361         RaftActorContext initialActorContext = new MockRaftActorContext();
362
363         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
364                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
365                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
366                 actorFactory.generateActorId(LEADER_ID));
367
368         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
369         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
370
371         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
372
373         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
374         Follower newFollower2 = new Follower(follower2ActorContext);
375         followerActor.underlyingActor().setBehavior(newFollower2);
376
377         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
378         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
379
380         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
381
382         // Wait for leader's install snapshot and capture it
383
384         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
385
386         // Send a second AddServer - should get queued
387         TestKit testKit2 = new TestKit(getSystem());
388         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
389
390         // Continue the first AddServer
391         newFollowerRaftActorInstance.setDropMessageOfType(null);
392         newFollowerRaftActor.tell(installSnapshot, leaderActor);
393
394         // Verify both complete successfully
395         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
396         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
397
398         addServerReply = testKit2.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
399         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
400
401         // Verify ServerConfigurationPayload entries in leader's log
402
403         expectMatching(leaderCollectorActor, ApplyState.class, 2);
404         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
405         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
406         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
407         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
408                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
409
410         // Verify ServerConfigurationPayload entry in the new follower
411
412         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
413         assertEquals("New follower peers", ImmutableSet.of(LEADER_ID, NEW_SERVER_ID2),
414                newFollowerActorContext.getPeerIds());
415
416         LOG.info("testAddServerWithOperationInProgress ending");
417     }
418
419     @Test
420     public void testAddServerWithPriorSnapshotInProgress() {
421         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
422
423         setupNewFollower();
424         RaftActorContext initialActorContext = new MockRaftActorContext();
425
426         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
427                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
428                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
429                 actorFactory.generateActorId(LEADER_ID));
430
431         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
432         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
433
434         ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
435
436         // Drop commit message for now to delay snapshot completion
437         leaderRaftActor.setDropMessageOfType(String.class);
438
439         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
440
441         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
442
443         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
444
445         leaderRaftActor.setDropMessageOfType(null);
446         leaderActor.tell(commitMsg, leaderActor);
447
448         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
449         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
450         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
451
452         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
453
454         // Verify ServerConfigurationPayload entry in leader's log
455
456         expectFirstMatching(leaderCollectorActor, ApplyState.class);
457         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
458         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
459         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
460         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
461                 votingServer(NEW_SERVER_ID));
462
463         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
464     }
465
466     @Test
467     public void testAddServerWithPriorSnapshotCompleteTimeout() {
468         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
469
470         setupNewFollower();
471         RaftActorContext initialActorContext = new MockRaftActorContext();
472
473         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
474                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
475                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
476                 actorFactory.generateActorId(LEADER_ID));
477
478         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
479         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
480
481         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
482
483         // Drop commit message so the snapshot doesn't complete.
484         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
485
486         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
487
488         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
489
490         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
491         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
492
493         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
494
495         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
496     }
497
498     @Test
499     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() {
500         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
501
502         setupNewFollower();
503         RaftActorContext initialActorContext = new MockRaftActorContext();
504
505         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
506                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
507                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
508                 actorFactory.generateActorId(LEADER_ID));
509
510         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
511         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
512         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
513
514         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
515
516         // Drop the commit message so the snapshot doesn't complete yet.
517         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
518
519         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
520
521         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
522
523         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
524
525         // Change the leader behavior to follower
526         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
527
528         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
529         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
530         // isCapturing assertion below.
531         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
532
533         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
534         leaderActor.tell(commitMsg, leaderActor);
535
536         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
537
538         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
539         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
540
541         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
542         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
543
544         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
545     }
546
547     @Test
548     public void testAddServerWithLeaderChangeDuringInstallSnapshot() {
549         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
550
551         setupNewFollower();
552         RaftActorContext initialActorContext = new MockRaftActorContext();
553
554         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
555                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
556                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
557                 actorFactory.generateActorId(LEADER_ID));
558
559         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
560         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
561
562         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
563
564         ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
565
566         // Drop the UnInitializedFollowerSnapshotReply to delay it.
567         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
568
569         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
570
571         final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
572                 UnInitializedFollowerSnapshotReply.class);
573
574         // Prevent election timeout when the leader switches to follower
575         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
576
577         // Change the leader behavior to follower
578         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
579
580         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
581         leaderRaftActor.setDropMessageOfType(null);
582         leaderActor.tell(snapshotReply, leaderActor);
583
584         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
585         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
586
587         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
588
589         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
590     }
591
592     @Test
593     public void testAddServerWithInstallSnapshotTimeout() {
594         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
595
596         setupNewFollower();
597         RaftActorContext initialActorContext = new MockRaftActorContext();
598
599         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
600                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
601                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
602                 actorFactory.generateActorId(LEADER_ID));
603
604         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
605         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
606         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
607
608         // Drop the InstallSnapshot message so it times out
609         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
610
611         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
612
613         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
614
615         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
616         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
617
618         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
619         assertEquals("Leader followers size", 0,
620                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
621
622         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
623     }
624
625     @Test
626     public void testAddServerWithNoLeader() {
627         LOG.info("testAddServerWithNoLeader starting");
628
629         setupNewFollower();
630         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
631         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
632
633         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
634                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
635                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
636                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
637                 actorFactory.generateActorId(LEADER_ID));
638         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
639
640         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
641                 testKit.getRef());
642         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
643         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
644
645         LOG.info("testAddServerWithNoLeader ending");
646     }
647
648     @Test
649     public void testAddServerWithNoConsensusReached() {
650         LOG.info("testAddServerWithNoConsensusReached starting");
651
652         setupNewFollower();
653         RaftActorContext initialActorContext = new MockRaftActorContext();
654
655         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
656                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
657                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
658                 actorFactory.generateActorId(LEADER_ID));
659
660         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
661         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
662
663         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
664
665         // Drop UnInitializedFollowerSnapshotReply initially
666         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
667
668         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
669         newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
670
671         // Drop AppendEntries to the new follower so consensus isn't reached
672         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
673
674         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
675
676         // Capture the UnInitializedFollowerSnapshotReply
677         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
678
679         // Send the UnInitializedFollowerSnapshotReply to resume the first request
680         leaderRaftActor.setDropMessageOfType(null);
681         leaderActor.tell(snapshotReply, leaderActor);
682
683         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
684
685         // Send a second AddServer
686         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
687
688         // The first AddServer should succeed with OK even though consensus wasn't reached
689         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
690         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
691         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
692
693         // Verify ServerConfigurationPayload entry in leader's log
694         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
695                 votingServer(NEW_SERVER_ID));
696
697         // The second AddServer should fail since consensus wasn't reached for the first
698         addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
699         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
700
701         // Re-send the second AddServer - should also fail
702         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
703         addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
704         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
705
706         LOG.info("testAddServerWithNoConsensusReached ending");
707     }
708
709     @Test
710     public void testAddServerWithExistingServer() {
711         LOG.info("testAddServerWithExistingServer starting");
712
713         RaftActorContext initialActorContext = new MockRaftActorContext();
714
715         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
716                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
717                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
718                 actorFactory.generateActorId(LEADER_ID));
719
720         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
721
722         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
723         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
724
725         LOG.info("testAddServerWithExistingServer ending");
726     }
727
728     @Test
729     public void testAddServerForwardedToLeader() {
730         LOG.info("testAddServerForwardedToLeader starting");
731
732         setupNewFollower();
733         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
734         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
735
736         ActorRef leaderActor = actorFactory.createActor(
737                 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
738
739         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
740                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
741                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
742                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
743                 actorFactory.generateActorId(FOLLOWER_ID));
744         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
745
746         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
747                 -1, -1, (short)0), leaderActor);
748
749         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
750                 testKit.getRef());
751         expectFirstMatching(leaderActor, AddServer.class);
752
753         LOG.info("testAddServerForwardedToLeader ending");
754     }
755
756     @Test
757     public void testOnApplyState() {
758         LOG.info("testOnApplyState starting");
759
760         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
761         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
762         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
763                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
764                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
765                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
766                 actorFactory.generateActorId(LEADER_ID));
767
768         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
769                 noLeaderActor.underlyingActor());
770
771         ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
772                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
773         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
774         assertEquals("Message handled", true, handled);
775
776         ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
777                 new MockRaftActorContext.MockPayload("1"));
778         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
779         assertEquals("Message handled", false, handled);
780
781         LOG.info("testOnApplyState ending");
782     }
783
784     @Test
785     public void testRemoveServerWithNoLeader() {
786         LOG.info("testRemoveServerWithNoLeader starting");
787
788         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
789         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
790
791         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
792                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
793                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
794                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
795                 actorFactory.generateActorId(LEADER_ID));
796         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
797
798         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
799         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
800         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
801
802         LOG.info("testRemoveServerWithNoLeader ending");
803     }
804
805     @Test
806     public void testRemoveServerNonExistentServer() {
807         LOG.info("testRemoveServerNonExistentServer starting");
808
809         RaftActorContext initialActorContext = new MockRaftActorContext();
810
811         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
812                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
813                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
814                 actorFactory.generateActorId(LEADER_ID));
815
816         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
817         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
818         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
819
820         LOG.info("testRemoveServerNonExistentServer ending");
821     }
822
823     @Test
824     public void testRemoveServerForwardToLeader() {
825         LOG.info("testRemoveServerForwardToLeader starting");
826
827         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
828         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
829
830         ActorRef leaderActor = actorFactory.createTestActor(
831                 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
832
833         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
834                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
835                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
836                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
837                 actorFactory.generateActorId(FOLLOWER_ID));
838         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
839
840         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
841                 -1, -1, (short)0), leaderActor);
842
843         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
844         expectFirstMatching(leaderActor, RemoveServer.class);
845
846         LOG.info("testRemoveServerForwardToLeader ending");
847     }
848
849     @Test
850     public void testRemoveServer() {
851         LOG.info("testRemoveServer starting");
852
853         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
854         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
855         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
856
857         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
858         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
859         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
860         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
861         RaftActorContext initialActorContext = new MockRaftActorContext();
862
863         final String downNodeId = "downNode";
864         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
865                 ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
866                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
867                 actorFactory.generateActorId(LEADER_ID));
868
869         final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
870
871         ActorRef follower1Collector = actorFactory.createActor(
872                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
873         final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
874                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
875                         FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
876                         follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
877
878         ActorRef follower2Collector = actorFactory.createActor(
879                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
880         final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
881                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
882                         FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
883                         follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
884
885         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
886         follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
887         follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
888
889         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
890         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
891         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
892
893         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
894         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
895         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
896                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
897
898         applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
899         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
900         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
901                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
902
903         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
904         assertTrue("Expected Leader", currentBehavior instanceof Leader);
905         assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
906
907         MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
908
909         LOG.info("testRemoveServer ending");
910     }
911
912     @Test
913     public void testRemoveServerLeader() {
914         LOG.info("testRemoveServerLeader starting");
915
916         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
917         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
918         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
919
920         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
921         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
922         RaftActorContext initialActorContext = new MockRaftActorContext();
923
924         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
925                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
926                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
927                 actorFactory.generateActorId(LEADER_ID));
928
929         final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
930
931         final ActorRef followerCollector =
932                 actorFactory.createActor(MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
933         actorFactory.createTestActor(
934                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
935                         configParams, NO_PERSISTENCE, followerCollector)
936                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
937                 followerActorId);
938
939         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
940         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
941         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
942
943         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
944         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
945         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
946                 votingServer(FOLLOWER_ID));
947
948         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
949
950         LOG.info("testRemoveServerLeader ending");
951     }
952
953     @Test
954     public void testRemoveServerLeaderWithNoFollowers() {
955         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
956
957         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
958                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
959                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
960                 actorFactory.generateActorId(LEADER_ID));
961
962         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
963         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
964         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
965
966         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
967     }
968
969     @Test
970     public void testChangeServersVotingStatus() {
971         LOG.info("testChangeServersVotingStatus starting");
972
973         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
974         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
975         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
976
977         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
978         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
979         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
980         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
981
982         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
983                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
984                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
985                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
986         ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
987
988         ActorRef follower1Collector = actorFactory.createActor(
989                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
990         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
991                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
992                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
993                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
994
995         ActorRef follower2Collector = actorFactory.createActor(
996                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
997         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
998                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
999                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1000                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1001
1002         // Send first ChangeServersVotingStatus message
1003
1004         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
1005                 testKit.getRef());
1006         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1007         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1008
1009         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1010         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1011         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1012                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1013
1014         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1015         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1016                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1017                 nonVotingServer(FOLLOWER_ID2));
1018
1019         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1020         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1021                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1022                 nonVotingServer(FOLLOWER_ID2));
1023
1024         MessageCollectorActor.clearMessages(leaderCollector);
1025         MessageCollectorActor.clearMessages(follower1Collector);
1026         MessageCollectorActor.clearMessages(follower2Collector);
1027
1028         // Send second ChangeServersVotingStatus message
1029
1030         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1031         reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1032         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1033
1034         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1035         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1036                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1037
1038         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1039         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1040                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1041
1042         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1043         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1044                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1045
1046         LOG.info("testChangeServersVotingStatus ending");
1047     }
1048
1049     @Test
1050     public void testChangeLeaderToNonVoting() {
1051         LOG.info("testChangeLeaderToNonVoting starting");
1052
1053         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1054         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1055
1056         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1057         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1058         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1059         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1060
1061         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1062                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1063                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1064                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1065         ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1066
1067         ActorRef follower1Collector = actorFactory.createActor(
1068                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1069         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1070                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1071                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1072                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1073
1074         ActorRef follower2Collector = actorFactory.createActor(
1075                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1076         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1077                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1078                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1079                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1080
1081         // Send ChangeServersVotingStatus message
1082
1083         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1084         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1085         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1086
1087         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1088         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1089                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1090
1091         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1092         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1093                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1094
1095         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1096         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1097                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1098
1099         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1100         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1101
1102         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1103
1104         LOG.info("testChangeLeaderToNonVoting ending");
1105     }
1106
1107     @Test
1108     public void testChangeLeaderToNonVotingInSingleNode() {
1109         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1110
1111         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1112                 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1113                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1114
1115         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1116         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1117         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1118
1119         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1120     }
1121
1122     @Test
1123     public void testChangeToVotingWithNoLeader() {
1124         LOG.info("testChangeToVotingWithNoLeader starting");
1125
1126         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1127         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1128         configParams.setElectionTimeoutFactor(5);
1129
1130         final String node1ID = "node1";
1131         final String node2ID = "node2";
1132
1133         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1134         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1135         // actors created).
1136
1137         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1138                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1139                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1140         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1141
1142         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1143         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1144         InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1145         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1146         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1147         InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1148
1149         ActorRef node1Collector = actorFactory.createActor(
1150                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1151         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1152                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1153                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1154         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1155
1156         ActorRef node2Collector = actorFactory.createActor(
1157                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1158         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1159                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1160                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1161         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1162
1163         node1RaftActor.waitForInitializeBehaviorComplete();
1164         node2RaftActor.waitForInitializeBehaviorComplete();
1165
1166         // Verify the intended server config was loaded and applied.
1167         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1168                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1169                 votingServer("downNode2"));
1170         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1171         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1172         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1173
1174         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1175                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1176                 votingServer("downNode2"));
1177         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1178
1179         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1180         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1181         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1182         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1183
1184         // First send the message such that node1 has no peer address for node2 - should fail.
1185
1186         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1187                 node2ID, true, "downNode1", false, "downNode2", false));
1188         node1RaftActorRef.tell(changeServers, testKit.getRef());
1189         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1190         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1191         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1192
1193         // Send an AppendEntries so node1 has a leaderId
1194
1195         long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1196         node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1197                 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1198
1199         // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
1200         // ChangeServersVotingStatus message, it will try to elect a leader.
1201
1202         AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
1203             rs -> assertEquals("getLeader", null, rs.getLeader()));
1204
1205         // Update node2's peer address and send the message again
1206
1207         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1208
1209         node1RaftActorRef.tell(changeServers, testKit.getRef());
1210         reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1211         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1212
1213         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1214                 ApplyJournalEntries.class);
1215         assertEquals("getToIndex", 1, apply.getToIndex());
1216         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1217                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1218                 nonVotingServer("downNode2"));
1219         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1220         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1221
1222         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1223         assertEquals("getToIndex", 1, apply.getToIndex());
1224         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1225                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1226                 nonVotingServer("downNode2"));
1227         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1228         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1229
1230         LOG.info("testChangeToVotingWithNoLeader ending");
1231     }
1232
1233     @Test
1234     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1235         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1236
1237         final String node1ID = "node1";
1238         final String node2ID = "node2";
1239
1240         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1241                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1242                         ? actorFactory.createTestActorPath(node2ID) : null;
1243
1244         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1245                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1246         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1247
1248         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1249         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1250         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1251         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1252
1253         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1254         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1255         configParams1.setElectionTimeoutFactor(1);
1256         configParams1.setPeerAddressResolver(peerAddressResolver);
1257         ActorRef node1Collector = actorFactory.createActor(
1258                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1259         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1260                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1261                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1262         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1263
1264         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1265         configParams2.setElectionTimeoutFactor(1000000);
1266         configParams2.setPeerAddressResolver(peerAddressResolver);
1267         ActorRef node2Collector = actorFactory.createActor(
1268                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1269         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1270                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1271                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1272         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1273
1274         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1275         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1276         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1277         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1278         // node2 was previously voting.
1279
1280         node2RaftActor.setDropMessageOfType(RequestVote.class);
1281
1282         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1283         node1RaftActorRef.tell(changeServers, testKit.getRef());
1284         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1285         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1286
1287         assertEquals("Server config", ImmutableSet.of(nonVotingServer(node1ID), votingServer(node2ID)),
1288             new HashSet<>(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1289         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1290
1291         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1292     }
1293
1294     @Test
1295     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1296         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1297
1298         final String node1ID = "node1";
1299         final String node2ID = "node2";
1300
1301         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1302                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1303                         ? actorFactory.createTestActorPath(node2ID) : null;
1304
1305         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1306         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1307         configParams.setElectionTimeoutFactor(3);
1308         configParams.setPeerAddressResolver(peerAddressResolver);
1309
1310         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1311                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1312         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1313
1314         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1315         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1316         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1317         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1318         InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1319                 new MockRaftActorContext.MockPayload("2")));
1320         InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1321
1322         ActorRef node1Collector = actorFactory.createActor(
1323                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1324         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1325                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1326                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1327         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1328
1329         ActorRef node2Collector = actorFactory.createActor(
1330                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1331         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1332                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1333                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1334         final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1335
1336         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1337         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1338         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1339         // forward the request to node2.
1340
1341         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1342                 ImmutableMap.of(node1ID, true, node2ID, true));
1343         node1RaftActorRef.tell(changeServers, testKit.getRef());
1344         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1345         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1346
1347         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1348         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1349                 votingServer(node1ID), votingServer(node2ID));
1350         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1351
1352         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1353         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1354                 votingServer(node1ID), votingServer(node2ID));
1355         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1356         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1357
1358         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1359     }
1360
1361     @Test
1362     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1363         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1364
1365         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1366         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1367         configParams.setElectionTimeoutFactor(100000);
1368
1369         final String node1ID = "node1";
1370         final String node2ID = "node2";
1371
1372         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1373                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1374                         ? actorFactory.createTestActorPath(node2ID) : null);
1375
1376         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1377                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1378         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1379
1380         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1381         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1382         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1383         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1384
1385         ActorRef node1Collector = actorFactory.createActor(
1386                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1387         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1388                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1389                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1390         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1391
1392         ActorRef node2Collector = actorFactory.createActor(
1393                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1394         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1395                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1396                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1397         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1398
1399         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1400         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1401         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1402         // request to node2 when node2 is elected.
1403
1404         node2RaftActor.setDropMessageOfType(RequestVote.class);
1405
1406         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1407                 node2ID, true));
1408         node1RaftActorRef.tell(changeServers, testKit.getRef());
1409
1410         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1411
1412         node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1413
1414         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1415         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1416
1417         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1418         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1419                 votingServer(node1ID), votingServer(node2ID));
1420         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1421         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1422
1423         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1424         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1425                 votingServer(node1ID), votingServer(node2ID));
1426         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1427
1428         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1429     }
1430
1431     private static void verifyRaftState(final RaftState expState, final RaftActor... raftActors) {
1432         Stopwatch sw = Stopwatch.createStarted();
1433         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1434             for (RaftActor raftActor : raftActors) {
1435                 if (raftActor.getRaftState() == expState) {
1436                     return;
1437                 }
1438             }
1439         }
1440
1441         fail("None of the RaftActors have state " + expState);
1442     }
1443
1444     private static ServerInfo votingServer(final String id) {
1445         return new ServerInfo(id, true);
1446     }
1447
1448     private static ServerInfo nonVotingServer(final String id) {
1449         return new ServerInfo(id, false);
1450     }
1451
1452     private ActorRef newLeaderCollectorActor(final MockLeaderRaftActor leaderRaftActor) {
1453         return newCollectorActor(leaderRaftActor, LEADER_ID);
1454     }
1455
1456     private ActorRef newCollectorActor(final AbstractMockRaftActor raftActor, final String id) {
1457         ActorRef collectorActor = actorFactory.createTestActor(
1458                 MessageCollectorActor.props(), actorFactory.generateActorId(id + "Collector"));
1459         raftActor.setCollectorActor(collectorActor);
1460         return collectorActor;
1461     }
1462
1463     private static void verifyServerConfigurationPayloadEntry(final ReplicatedLog log, final ServerInfo... expected) {
1464         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1465         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1466         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1467         assertEquals("Server config", ImmutableSet.copyOf(expected), new HashSet<>(payload.getServerConfig()));
1468     }
1469
1470     private static RaftActorContextImpl newFollowerContext(final String id,
1471             final TestActorRef<? extends AbstractActor> actor) {
1472         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1473         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1474         configParams.setElectionTimeoutFactor(100000);
1475         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
1476         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1477         termInfo.update(1, LEADER_ID);
1478         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1479                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1480                 noPersistence, applyState -> actor.tell(applyState, actor), LOG,  MoreExecutors.directExecutor());
1481     }
1482
1483     abstract static class AbstractMockRaftActor extends MockRaftActor {
1484         private volatile ActorRef collectorActor;
1485         private volatile Class<?> dropMessageOfType;
1486
1487         AbstractMockRaftActor(final String id, final Map<String, String> peerAddresses,
1488                 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1489             super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1490                     .persistent(Optional.of(persistent)));
1491             this.collectorActor = collectorActor;
1492         }
1493
1494         void setDropMessageOfType(final Class<?> dropMessageOfType) {
1495             this.dropMessageOfType = dropMessageOfType;
1496         }
1497
1498         void setCollectorActor(final ActorRef collectorActor) {
1499             this.collectorActor = collectorActor;
1500         }
1501
1502         @Override
1503         public void handleCommand(final Object message) {
1504             if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1505                 super.handleCommand(message);
1506             }
1507
1508             if (collectorActor != null) {
1509                 collectorActor.tell(message, getSender());
1510             }
1511         }
1512     }
1513
1514     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1515
1516         CollectingMockRaftActor(final String id, final Map<String, String> peerAddresses,
1517                 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1518             super(id, peerAddresses, config, persistent, collectorActor);
1519             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1520                 @Override
1521                 public void createSnapshot(final ActorRef actorRef,
1522                         final Optional<OutputStream> installSnapshotStream) {
1523                     actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1524                 }
1525
1526                 @Override
1527                 public void applySnapshot(
1528                         final org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1529                 }
1530
1531                 @Override
1532                 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1533                         final ByteSource snapshotBytes) {
1534                     throw new UnsupportedOperationException();
1535                 }
1536             };
1537         }
1538
1539         public static Props props(final String id, final Map<String, String> peerAddresses,
1540                 final ConfigParams config, final boolean persistent, final ActorRef collectorActor) {
1541
1542             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1543                     persistent, collectorActor);
1544         }
1545
1546     }
1547
1548     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1549         public MockLeaderRaftActor(final Map<String, String> peerAddresses, final ConfigParams config,
1550                 final RaftActorContext fromContext) {
1551             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1552             setPersistence(false);
1553
1554             RaftActorContext context = getRaftActorContext();
1555             for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1556                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1557                 getState().add(entry.getData());
1558                 context.getReplicatedLog().append(entry);
1559             }
1560
1561             context.setCommitIndex(fromContext.getCommitIndex());
1562             context.setLastApplied(fromContext.getLastApplied());
1563             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1564                     fromContext.getTermInformation().getVotedFor());
1565         }
1566
1567         @Override
1568         protected void initializeBehavior() {
1569             changeCurrentBehavior(new Leader(getRaftActorContext()));
1570             initializeBehaviorComplete.countDown();
1571         }
1572
1573         @Override
1574         @SuppressWarnings("checkstyle:IllegalCatch")
1575         public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1576             MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
1577             if (installSnapshotStream.isPresent()) {
1578                 SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
1579             }
1580
1581             actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1582         }
1583
1584         static Props props(final Map<String, String> peerAddresses, final RaftActorContext fromContext) {
1585             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1586             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1587             configParams.setElectionTimeoutFactor(10);
1588             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1589         }
1590     }
1591
1592     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1593         public MockNewFollowerRaftActor(final ConfigParams config, final ActorRef collectorActor) {
1594             super(NEW_SERVER_ID, new HashMap<>(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1595             setPersistence(false);
1596         }
1597
1598         static Props props(final ConfigParams config, final ActorRef collectorActor) {
1599             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1600         }
1601     }
1602 }