Fix timing issue in testChangeToVotingWithNoLeader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17
18 import akka.actor.ActorRef;
19 import akka.actor.Props;
20 import akka.actor.UntypedActor;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.Maps;
28 import com.google.common.collect.Sets;
29 import com.google.common.io.ByteSource;
30 import java.io.OutputStream;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.SerializationUtils;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
46 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
48 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
49 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
50 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
51 import org.opendaylight.controller.cluster.raft.messages.AddServer;
52 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
53 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
54 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
55 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
56 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
57 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
58 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
59 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
60 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
61 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
62 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
63 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
65 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
67 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
69 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
70 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76 import scala.concurrent.duration.FiniteDuration;
77
78 /**
79  * Unit tests for RaftActorServerConfigurationSupport.
80  *
81  * @author Thomas Pantelis
82  */
83 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
84     static final String LEADER_ID = "leader";
85     static final String FOLLOWER_ID = "follower";
86     static final String FOLLOWER_ID2 = "follower2";
87     static final String NEW_SERVER_ID = "new-server";
88     static final String NEW_SERVER_ID2 = "new-server2";
89     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
90     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
91     private static final boolean NO_PERSISTENCE = false;
92     private static final boolean PERSISTENT = true;
93
94     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
95
96     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
97             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
98             actorFactory.generateActorId(FOLLOWER_ID));
99
100     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
101     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
102     private RaftActorContext newFollowerActorContext;
103
104     private final JavaTestKit testKit = new JavaTestKit(getSystem());
105
106     @Before
107     public void setup() {
108         InMemoryJournal.clear();
109         InMemorySnapshotStore.clear();
110     }
111
112     @SuppressWarnings("checkstyle:IllegalCatch")
113     private void setupNewFollower() {
114         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
115
116         newFollowerCollectorActor = actorFactory.createTestActor(
117                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
118                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
119         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
120                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
121                 actorFactory.generateActorId(NEW_SERVER_ID));
122
123         try {
124             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
125         } catch (Exception e) {
126             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
127         }
128     }
129
130     private static DefaultConfigParamsImpl newFollowerConfigParams() {
131         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
132         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
133         configParams.setElectionTimeoutFactor(100000);
134         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
135         return configParams;
136     }
137
138     @After
139     public void tearDown() throws Exception {
140         actorFactory.close();
141     }
142
143     @Test
144     public void testAddServerWithExistingFollower() throws Exception {
145         LOG.info("testAddServerWithExistingFollower starting");
146         setupNewFollower();
147         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
148         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
149                 0, 3, 1).build());
150         followerActorContext.setCommitIndex(2);
151         followerActorContext.setLastApplied(2);
152
153         Follower follower = new Follower(followerActorContext);
154         followerActor.underlyingActor().setBehavior(follower);
155         followerActorContext.setCurrentBehavior(follower);
156
157         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
158                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
159                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
160                 actorFactory.generateActorId(LEADER_ID));
161
162         // Expect initial heartbeat from the leader.
163         expectFirstMatching(followerActor, AppendEntries.class);
164         clearMessages(followerActor);
165
166         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
167         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
168
169         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
170
171         // Leader should install snapshot - capture and verify ApplySnapshot contents
172
173         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
174         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
175         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
176
177         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
178         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
179         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
180
181         // Verify ServerConfigurationPayload entry in leader's log
182
183         expectFirstMatching(leaderCollectorActor, ApplyState.class);
184         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
185         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
186         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
187         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
188         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
189                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
190
191         // Verify ServerConfigurationPayload entry in both followers
192
193         expectFirstMatching(followerActor, ApplyState.class);
194         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
195         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
196                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
197
198         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
199         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
200         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
201                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
202
203         // Verify new server config was applied in both followers
204
205         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
206
207         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
208                 newFollowerActorContext.getPeerIds());
209
210         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
211         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
212         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
213         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
214
215         assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
216                 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
217         assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
218                 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
219
220         assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
221                 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
222         assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
223                 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
224
225         LOG.info("testAddServerWithExistingFollower ending");
226     }
227
228     @Test
229     public void testAddServerWithNoExistingFollower() throws Exception {
230         LOG.info("testAddServerWithNoExistingFollower starting");
231
232         setupNewFollower();
233         RaftActorContext initialActorContext = new MockRaftActorContext();
234         initialActorContext.setCommitIndex(1);
235         initialActorContext.setLastApplied(1);
236         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
237                 0, 2, 1).build());
238
239         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
240                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
241                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
242                 actorFactory.generateActorId(LEADER_ID));
243
244         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
245         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
246
247         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
248
249         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
250
251         // Leader should install snapshot - capture and verify ApplySnapshot contents
252
253         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
254         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
255         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
256
257         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
258         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
259         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
260
261         // Verify ServerConfigurationPayload entry in leader's log
262
263         expectFirstMatching(leaderCollectorActor, ApplyState.class);
264         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
265         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
266         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
267         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
268                 votingServer(NEW_SERVER_ID));
269
270         // Verify ServerConfigurationPayload entry in the new follower
271
272         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
273         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
274         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
275                 votingServer(NEW_SERVER_ID));
276
277         // Verify new server config was applied in the new follower
278
279         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
280
281         LOG.info("testAddServerWithNoExistingFollower ending");
282     }
283
284     @Test
285     public void testAddServersAsNonVoting() throws Exception {
286         LOG.info("testAddServersAsNonVoting starting");
287
288         setupNewFollower();
289         RaftActorContext initialActorContext = new MockRaftActorContext();
290
291         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
292                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
293                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
294                 actorFactory.generateActorId(LEADER_ID));
295
296         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
297         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
298
299         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
300
301         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
302
303         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
304         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
305         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
306
307         // Verify ServerConfigurationPayload entry in leader's log
308
309         expectFirstMatching(leaderCollectorActor, ApplyState.class);
310
311         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
312         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
313         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
314         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
315                 nonVotingServer(NEW_SERVER_ID));
316
317         // Verify ServerConfigurationPayload entry in the new follower
318
319         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
320         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
321         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
322                 nonVotingServer(NEW_SERVER_ID));
323
324         // Verify new server config was applied in the new follower
325
326         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
327
328         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
329
330         // Add another non-voting server.
331
332         clearMessages(leaderCollectorActor);
333
334         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
335         Follower newFollower2 = new Follower(follower2ActorContext);
336         followerActor.underlyingActor().setBehavior(newFollower2);
337
338         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
339
340         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
341         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
342         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
343
344         expectFirstMatching(leaderCollectorActor, ApplyState.class);
345         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
346         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
347         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
348         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
349                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
350
351         LOG.info("testAddServersAsNonVoting ending");
352     }
353
354     @Test
355     public void testAddServerWithOperationInProgress() throws Exception {
356         LOG.info("testAddServerWithOperationInProgress starting");
357
358         setupNewFollower();
359         RaftActorContext initialActorContext = new MockRaftActorContext();
360
361         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
362                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
363                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
364                 actorFactory.generateActorId(LEADER_ID));
365
366         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
367         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
368
369         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
370
371         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
372         Follower newFollower2 = new Follower(follower2ActorContext);
373         followerActor.underlyingActor().setBehavior(newFollower2);
374
375         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
376         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
377
378         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
379
380         // Wait for leader's install snapshot and capture it
381
382         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
383
384         // Send a second AddServer - should get queued
385         JavaTestKit testKit2 = new JavaTestKit(getSystem());
386         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
387
388         // Continue the first AddServer
389         newFollowerRaftActorInstance.setDropMessageOfType(null);
390         newFollowerRaftActor.tell(installSnapshot, leaderActor);
391
392         // Verify both complete successfully
393         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
394         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
395
396         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
397         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
398
399         // Verify ServerConfigurationPayload entries in leader's log
400
401         expectMatching(leaderCollectorActor, ApplyState.class, 2);
402         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
403         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
404         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
405         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
406                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
407
408         // Verify ServerConfigurationPayload entry in the new follower
409
410         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
411         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
412                newFollowerActorContext.getPeerIds());
413
414         LOG.info("testAddServerWithOperationInProgress ending");
415     }
416
417     @Test
418     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
419         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
420
421         setupNewFollower();
422         RaftActorContext initialActorContext = new MockRaftActorContext();
423
424         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
425                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
426                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
427                 actorFactory.generateActorId(LEADER_ID));
428
429         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
430         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
431
432         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
433
434         // Drop commit message for now to delay snapshot completion
435         leaderRaftActor.setDropMessageOfType(String.class);
436
437         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
438
439         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
440
441         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
442
443         leaderRaftActor.setDropMessageOfType(null);
444         leaderActor.tell(commitMsg, leaderActor);
445
446         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
447         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
448         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
449
450         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
451
452         // Verify ServerConfigurationPayload entry in leader's log
453
454         expectFirstMatching(leaderCollectorActor, ApplyState.class);
455         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
456         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
457         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
458         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
459                 votingServer(NEW_SERVER_ID));
460
461         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
462     }
463
464     @Test
465     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
466         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
467
468         setupNewFollower();
469         RaftActorContext initialActorContext = new MockRaftActorContext();
470
471         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
472                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
473                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
474                 actorFactory.generateActorId(LEADER_ID));
475
476         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
477         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
478
479         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
480
481         // Drop commit message so the snapshot doesn't complete.
482         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
483
484         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
485
486         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
487
488         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
489         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
490
491         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
492
493         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
494     }
495
496     @Test
497     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
498         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
499
500         setupNewFollower();
501         RaftActorContext initialActorContext = new MockRaftActorContext();
502
503         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
504                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
505                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
506                 actorFactory.generateActorId(LEADER_ID));
507
508         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
509         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
510         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
511
512         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
513
514         // Drop the commit message so the snapshot doesn't complete yet.
515         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
516
517         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
518
519         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
520
521         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
522
523         // Change the leader behavior to follower
524         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
525
526         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
527         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
528         // isCapturing assertion below.
529         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
530
531         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
532         leaderActor.tell(commitMsg, leaderActor);
533
534         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
535
536         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
537         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
538
539         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
540         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
541
542         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
543     }
544
545     @Test
546     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
547         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
548
549         setupNewFollower();
550         RaftActorContext initialActorContext = new MockRaftActorContext();
551
552         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
553                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
554                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
555                 actorFactory.generateActorId(LEADER_ID));
556
557         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
558         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
559
560         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
561
562         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
563
564         // Drop the UnInitializedFollowerSnapshotReply to delay it.
565         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
566
567         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
568
569         final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
570                 UnInitializedFollowerSnapshotReply.class);
571
572         // Prevent election timeout when the leader switches to follower
573         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
574
575         // Change the leader behavior to follower
576         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
577
578         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
579         leaderRaftActor.setDropMessageOfType(null);
580         leaderActor.tell(snapshotReply, leaderActor);
581
582         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
583         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
584
585         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
586
587         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
588     }
589
590     @Test
591     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
592         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
593
594         setupNewFollower();
595         RaftActorContext initialActorContext = new MockRaftActorContext();
596
597         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
598                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
599                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
600                 actorFactory.generateActorId(LEADER_ID));
601
602         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
603         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
604         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
605
606         // Drop the InstallSnapshot message so it times out
607         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
608
609         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
610
611         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
612
613         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
614         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
615
616         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
617         assertEquals("Leader followers size", 0,
618                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
619
620         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
621     }
622
623     @Test
624     public void testAddServerWithNoLeader() {
625         LOG.info("testAddServerWithNoLeader starting");
626
627         setupNewFollower();
628         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
629         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
630
631         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
632                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
633                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
634                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
635                 actorFactory.generateActorId(LEADER_ID));
636         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
637
638         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
639                 testKit.getRef());
640         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
641         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
642
643         LOG.info("testAddServerWithNoLeader ending");
644     }
645
646     @Test
647     public void testAddServerWithNoConsensusReached() {
648         LOG.info("testAddServerWithNoConsensusReached starting");
649
650         setupNewFollower();
651         RaftActorContext initialActorContext = new MockRaftActorContext();
652
653         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
654                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
655                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
656                 actorFactory.generateActorId(LEADER_ID));
657
658         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
659         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
660
661         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
662
663         // Drop UnInitializedFollowerSnapshotReply initially
664         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
665
666         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
667         newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
668
669         // Drop AppendEntries to the new follower so consensus isn't reached
670         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
671
672         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
673
674         // Capture the UnInitializedFollowerSnapshotReply
675         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
676
677         // Send the UnInitializedFollowerSnapshotReply to resume the first request
678         leaderRaftActor.setDropMessageOfType(null);
679         leaderActor.tell(snapshotReply, leaderActor);
680
681         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
682
683         // Send a second AddServer
684         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
685
686         // The first AddServer should succeed with OK even though consensus wasn't reached
687         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
688         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
689         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
690
691         // Verify ServerConfigurationPayload entry in leader's log
692         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
693                 votingServer(NEW_SERVER_ID));
694
695         // The second AddServer should fail since consensus wasn't reached for the first
696         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
697         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
698
699         // Re-send the second AddServer - should also fail
700         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
701         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
702         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
703
704         LOG.info("testAddServerWithNoConsensusReached ending");
705     }
706
707     @Test
708     public void testAddServerWithExistingServer() {
709         LOG.info("testAddServerWithExistingServer starting");
710
711         RaftActorContext initialActorContext = new MockRaftActorContext();
712
713         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
714                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
715                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
716                 actorFactory.generateActorId(LEADER_ID));
717
718         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
719
720         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
721         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
722
723         LOG.info("testAddServerWithExistingServer ending");
724     }
725
726     @Test
727     public void testAddServerForwardedToLeader() {
728         LOG.info("testAddServerForwardedToLeader starting");
729
730         setupNewFollower();
731         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
732         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
733
734         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
735                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
736                 actorFactory.generateActorId(LEADER_ID));
737
738         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
739                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
740                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
741                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
742                 actorFactory.generateActorId(FOLLOWER_ID));
743         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
744
745         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
746                 -1, -1, (short)0), leaderActor);
747
748         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
749                 testKit.getRef());
750         expectFirstMatching(leaderActor, AddServer.class);
751
752         LOG.info("testAddServerForwardedToLeader ending");
753     }
754
755     @Test
756     public void testOnApplyState() {
757         LOG.info("testOnApplyState starting");
758
759         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
760         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
761         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
762                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
763                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
764                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
765                 actorFactory.generateActorId(LEADER_ID));
766
767         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
768                 noLeaderActor.underlyingActor());
769
770         ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
771                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
772         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
773         assertEquals("Message handled", true, handled);
774
775         ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
776                 new MockRaftActorContext.MockPayload("1"));
777         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
778         assertEquals("Message handled", false, handled);
779
780         LOG.info("testOnApplyState ending");
781     }
782
783     @Test
784     public void testRemoveServerWithNoLeader() {
785         LOG.info("testRemoveServerWithNoLeader starting");
786
787         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
788         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
789
790         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
791                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
792                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
793                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
794                 actorFactory.generateActorId(LEADER_ID));
795         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
796
797         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
798         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
799                 RemoveServerReply.class);
800         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
801
802         LOG.info("testRemoveServerWithNoLeader ending");
803     }
804
805     @Test
806     public void testRemoveServerNonExistentServer() {
807         LOG.info("testRemoveServerNonExistentServer starting");
808
809         RaftActorContext initialActorContext = new MockRaftActorContext();
810
811         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
812                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
813                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
814                 actorFactory.generateActorId(LEADER_ID));
815
816         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
817         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
818                 RemoveServerReply.class);
819         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
820
821         LOG.info("testRemoveServerNonExistentServer ending");
822     }
823
824     @Test
825     public void testRemoveServerForwardToLeader() {
826         LOG.info("testRemoveServerForwardToLeader starting");
827
828         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
829         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
830
831         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
832                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
833                 actorFactory.generateActorId(LEADER_ID));
834
835         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
836                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
837                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
838                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
839                 actorFactory.generateActorId(FOLLOWER_ID));
840         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
841
842         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
843                 -1, -1, (short)0), leaderActor);
844
845         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
846         expectFirstMatching(leaderActor, RemoveServer.class);
847
848         LOG.info("testRemoveServerForwardToLeader ending");
849     }
850
851     @Test
852     public void testRemoveServer() throws Exception {
853         LOG.info("testRemoveServer starting");
854
855         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
856         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
857         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
858
859         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
860         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
861         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
862         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
863         RaftActorContext initialActorContext = new MockRaftActorContext();
864
865         final String downNodeId = "downNode";
866         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
867                 ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
868                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
869                 actorFactory.generateActorId(LEADER_ID));
870
871         final TestActorRef<MessageCollectorActor> leaderCollector =
872                 newLeaderCollectorActor(leaderActor.underlyingActor());
873
874         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
875                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
876                         actorFactory.generateActorId("collector"));
877         final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
878                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
879                         FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
880                         follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
881
882         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
883                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
884                         actorFactory.generateActorId("collector"));
885         final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
886                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
887                         FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
888                         follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
889
890         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
891         follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
892         follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
893
894         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
895         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
896                 RemoveServerReply.class);
897         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
898
899         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
900         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
901         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
902                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
903
904         applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
905         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
906         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
907                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
908
909         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
910         assertTrue("Expected Leader", currentBehavior instanceof Leader);
911         assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
912
913         MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
914
915         LOG.info("testRemoveServer ending");
916     }
917
918     @Test
919     public void testRemoveServerLeader() {
920         LOG.info("testRemoveServerLeader starting");
921
922         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
923         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
924         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
925
926         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
927         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
928         RaftActorContext initialActorContext = new MockRaftActorContext();
929
930         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
931                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
932                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
933                 actorFactory.generateActorId(LEADER_ID));
934
935         final TestActorRef<MessageCollectorActor> leaderCollector =
936                 newLeaderCollectorActor(leaderActor.underlyingActor());
937
938         final TestActorRef<MessageCollectorActor> followerCollector =
939                 actorFactory.createTestActor(MessageCollectorActor.props()
940                 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
941         actorFactory.createTestActor(
942                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
943                         configParams, NO_PERSISTENCE, followerCollector)
944                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
945                 followerActorId);
946
947         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
948         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
949                 RemoveServerReply.class);
950         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
951
952         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
953         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
954         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
955                 votingServer(FOLLOWER_ID));
956
957         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
958
959         LOG.info("testRemoveServerLeader ending");
960     }
961
962     @Test
963     public void testRemoveServerLeaderWithNoFollowers() {
964         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
965
966         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
967                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
968                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
969                 actorFactory.generateActorId(LEADER_ID));
970
971         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
972         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
973                 RemoveServerReply.class);
974         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
975
976         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
977     }
978
979     @Test
980     public void testChangeServersVotingStatus() {
981         LOG.info("testChangeServersVotingStatus starting");
982
983         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
984         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
985         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
986
987         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
988         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
989         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
990         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
991
992         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
993                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
994                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
995                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
996         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
997
998         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
999                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1000                 actorFactory.generateActorId("collector"));
1001         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1002                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1003                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1004                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1005
1006         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1007                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1008                 actorFactory.generateActorId("collector"));
1009         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1010                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1011                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1012                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1013
1014         // Send first ChangeServersVotingStatus message
1015
1016         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
1017                 testKit.getRef());
1018         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1019         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1020
1021         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1022         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1023         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1024                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1025
1026         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1027         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1028                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1029                 nonVotingServer(FOLLOWER_ID2));
1030
1031         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1032         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1033                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1034                 nonVotingServer(FOLLOWER_ID2));
1035
1036         MessageCollectorActor.clearMessages(leaderCollector);
1037         MessageCollectorActor.clearMessages(follower1Collector);
1038         MessageCollectorActor.clearMessages(follower2Collector);
1039
1040         // Send second ChangeServersVotingStatus message
1041
1042         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1043         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1044         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1045
1046         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1047         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1048                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1049
1050         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1051         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1052                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1053
1054         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1055         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1056                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1057
1058         LOG.info("testChangeServersVotingStatus ending");
1059     }
1060
1061     @Test
1062     public void testChangeLeaderToNonVoting() {
1063         LOG.info("testChangeLeaderToNonVoting starting");
1064
1065         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1066         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1067
1068         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1069         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1070         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1071         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1072
1073         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1074                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1075                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1076                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1077         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1078
1079         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1080                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1081                 actorFactory.generateActorId("collector"));
1082         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1083                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1084                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1085                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1086
1087         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1088                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1089                 actorFactory.generateActorId("collector"));
1090         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1091                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1092                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1093                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1094
1095         // Send ChangeServersVotingStatus message
1096
1097         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1098         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1099         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1100
1101         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1102         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1103                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1104
1105         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1106         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1107                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1108
1109         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1110         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1111                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1112
1113         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1114         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1115
1116         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1117
1118         LOG.info("testChangeLeaderToNonVoting ending");
1119     }
1120
1121     @Test
1122     public void testChangeLeaderToNonVotingInSingleNode() {
1123         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1124
1125         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1126                 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1127                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1128
1129         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1130         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1131         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1132
1133         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1134     }
1135
1136     @Test
1137     public void testChangeToVotingWithNoLeader() {
1138         LOG.info("testChangeToVotingWithNoLeader starting");
1139
1140         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1141         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1142         configParams.setElectionTimeoutFactor(5);
1143
1144         final String node1ID = "node1";
1145         final String node2ID = "node2";
1146
1147         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1148         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1149         // actors created).
1150
1151         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1152                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1153                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1154         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1155
1156         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1157         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1158         InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1159         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1160         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1161         InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1162
1163         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1164                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1165                 actorFactory.generateActorId("collector"));
1166         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1167                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1168                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1169         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1170
1171         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1172                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1173                 actorFactory.generateActorId("collector"));
1174         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1175                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1176                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1177         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1178
1179         node1RaftActor.waitForInitializeBehaviorComplete();
1180         node2RaftActor.waitForInitializeBehaviorComplete();
1181
1182         // Verify the intended server config was loaded and applied.
1183         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1184                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1185                 votingServer("downNode2"));
1186         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1187         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1188         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1189
1190         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1191                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1192                 votingServer("downNode2"));
1193         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1194
1195         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1196         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1197         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1198         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1199
1200         // First send the message such that node1 has no peer address for node2 - should fail.
1201
1202         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1203                 node2ID, true, "downNode1", false, "downNode2", false));
1204         node1RaftActorRef.tell(changeServers, testKit.getRef());
1205         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1206         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1207         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1208
1209         // Send an AppendEntries so node1 has a leaderId
1210
1211         long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1212         node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1213                 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1214
1215         // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
1216         // ChangeServersVotingStatus message, it will try to elect a leader.
1217
1218         AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
1219             rs -> assertEquals("getLeader", null, rs.getLeader()));
1220
1221         // Update node2's peer address and send the message again
1222
1223         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1224
1225         node1RaftActorRef.tell(changeServers, testKit.getRef());
1226         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1227         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1228
1229         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1230                 ApplyJournalEntries.class);
1231         assertEquals("getToIndex", 1, apply.getToIndex());
1232         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1233                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1234                 nonVotingServer("downNode2"));
1235         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1236         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1237
1238         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1239         assertEquals("getToIndex", 1, apply.getToIndex());
1240         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1241                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1242                 nonVotingServer("downNode2"));
1243         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1244         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1245
1246         LOG.info("testChangeToVotingWithNoLeader ending");
1247     }
1248
1249     @Test
1250     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1251         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1252
1253         final String node1ID = "node1";
1254         final String node2ID = "node2";
1255
1256         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1257                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1258                         ? actorFactory.createTestActorPath(node2ID) : null;
1259
1260         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1261                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1262         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1263
1264         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1265         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1266         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1267         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1268
1269         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1270         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1271         configParams1.setElectionTimeoutFactor(1);
1272         configParams1.setPeerAddressResolver(peerAddressResolver);
1273         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1274                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1275                 actorFactory.generateActorId("collector"));
1276         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1277                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1278                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1279         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1280
1281         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1282         configParams2.setElectionTimeoutFactor(1000000);
1283         configParams2.setPeerAddressResolver(peerAddressResolver);
1284         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1285                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1286                 actorFactory.generateActorId("collector"));
1287         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1288                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1289                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1290         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1291
1292         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1293         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1294         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1295         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1296         // node2 was previously voting.
1297
1298         node2RaftActor.setDropMessageOfType(RequestVote.class);
1299
1300         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1301         node1RaftActorRef.tell(changeServers, testKit.getRef());
1302         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1303         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1304
1305         assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1306                 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1307         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1308
1309         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1310     }
1311
1312     @Test
1313     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1314         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1315
1316         final String node1ID = "node1";
1317         final String node2ID = "node2";
1318
1319         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1320                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1321                         ? actorFactory.createTestActorPath(node2ID) : null;
1322
1323         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1324         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1325         configParams.setElectionTimeoutFactor(3);
1326         configParams.setPeerAddressResolver(peerAddressResolver);
1327
1328         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1329                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1330         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1331
1332         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1333         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1334         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1335         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1336         InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1337                 new MockRaftActorContext.MockPayload("2")));
1338         InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1339
1340         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1341                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1342                 actorFactory.generateActorId("collector"));
1343         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1344                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1345                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1346         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1347
1348         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1349                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1350                 actorFactory.generateActorId("collector"));
1351         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1352                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1353                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1354         final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1355
1356         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1357         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1358         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1359         // forward the request to node2.
1360
1361         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1362                 ImmutableMap.of(node1ID, true, node2ID, true));
1363         node1RaftActorRef.tell(changeServers, testKit.getRef());
1364         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1365         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1366
1367         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1368         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1369                 votingServer(node1ID), votingServer(node2ID));
1370         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1371
1372         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1373         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1374                 votingServer(node1ID), votingServer(node2ID));
1375         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1376         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1377
1378         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1379     }
1380
1381     @Test
1382     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1383         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1384
1385         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1386         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1387         configParams.setElectionTimeoutFactor(100000);
1388
1389         final String node1ID = "node1";
1390         final String node2ID = "node2";
1391
1392         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1393                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1394                         ? actorFactory.createTestActorPath(node2ID) : null);
1395
1396         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1397                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1398         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1399
1400         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1401         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1402         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1403         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1404
1405         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1406                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1407                 actorFactory.generateActorId("collector"));
1408         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1409                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1410                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1411         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1412
1413         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1414                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1415                 actorFactory.generateActorId("collector"));
1416         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1417                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1418                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1419         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1420
1421         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1422         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1423         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1424         // request to node2 when node2 is elected.
1425
1426         node2RaftActor.setDropMessageOfType(RequestVote.class);
1427
1428         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1429                 node2ID, true));
1430         node1RaftActorRef.tell(changeServers, testKit.getRef());
1431
1432         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1433
1434         node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1435
1436         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1437         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1438
1439         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1440         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1441                 votingServer(node1ID), votingServer(node2ID));
1442         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1443         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1444
1445         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1446         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1447                 votingServer(node1ID), votingServer(node2ID));
1448         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1449
1450         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1451     }
1452
1453     private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1454         Stopwatch sw = Stopwatch.createStarted();
1455         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1456             for (RaftActor raftActor : raftActors) {
1457                 if (raftActor.getRaftState() == expState) {
1458                     return;
1459                 }
1460             }
1461         }
1462
1463         fail("None of the RaftActors have state " + expState);
1464     }
1465
1466     private static ServerInfo votingServer(String id) {
1467         return new ServerInfo(id, true);
1468     }
1469
1470     private static ServerInfo nonVotingServer(String id) {
1471         return new ServerInfo(id, false);
1472     }
1473
1474     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1475         return newCollectorActor(leaderRaftActor, LEADER_ID);
1476     }
1477
1478     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1479         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1480                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1481                 actorFactory.generateActorId(id + "Collector"));
1482         raftActor.setCollectorActor(collectorActor);
1483         return collectorActor;
1484     }
1485
1486     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1487         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1488         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1489         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1490         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1491     }
1492
1493     private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1494         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1495         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1496         configParams.setElectionTimeoutFactor(100000);
1497         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1498         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1499         termInfo.update(1, LEADER_ID);
1500         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1501                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1502                 noPersistence, applyState -> actor.tell(applyState, actor), LOG);
1503     }
1504
1505     abstract static class AbstractMockRaftActor extends MockRaftActor {
1506         private volatile TestActorRef<MessageCollectorActor> collectorActor;
1507         private volatile Class<?> dropMessageOfType;
1508
1509         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1510                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1511             super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1512                     .persistent(Optional.of(persistent)));
1513             this.collectorActor = collectorActor;
1514         }
1515
1516         void setDropMessageOfType(Class<?> dropMessageOfType) {
1517             this.dropMessageOfType = dropMessageOfType;
1518         }
1519
1520         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1521             this.collectorActor = collectorActor;
1522         }
1523
1524         @Override
1525         public void handleCommand(Object message) {
1526             if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1527                 super.handleCommand(message);
1528             }
1529
1530             if (collectorActor != null) {
1531                 collectorActor.tell(message, getSender());
1532             }
1533         }
1534     }
1535
1536     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1537
1538         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1539                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1540             super(id, peerAddresses, config, persistent, collectorActor);
1541             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1542                 @Override
1543                 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1544                     actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1545                 }
1546
1547                 @Override
1548                 public void applySnapshot(
1549                         org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1550                 }
1551
1552                 @Override
1553                 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1554                         ByteSource snapshotBytes) {
1555                     throw new UnsupportedOperationException();
1556                 }
1557             };
1558         }
1559
1560         public static Props props(final String id, final Map<String, String> peerAddresses,
1561                 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1562
1563             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1564                     persistent, collectorActor);
1565         }
1566
1567     }
1568
1569     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1570         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1571                 RaftActorContext fromContext) {
1572             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1573             setPersistence(false);
1574
1575             RaftActorContext context = getRaftActorContext();
1576             for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1577                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1578                 getState().add(entry.getData());
1579                 context.getReplicatedLog().append(entry);
1580             }
1581
1582             context.setCommitIndex(fromContext.getCommitIndex());
1583             context.setLastApplied(fromContext.getLastApplied());
1584             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1585                     fromContext.getTermInformation().getVotedFor());
1586         }
1587
1588         @Override
1589         protected void initializeBehavior() {
1590             changeCurrentBehavior(new Leader(getRaftActorContext()));
1591             initializeBehaviorComplete.countDown();
1592         }
1593
1594         @Override
1595         @SuppressWarnings("checkstyle:IllegalCatch")
1596         public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1597             MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
1598             if (installSnapshotStream.isPresent()) {
1599                 SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
1600             }
1601
1602             actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1603         }
1604
1605         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1606             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1607             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1608             configParams.setElectionTimeoutFactor(10);
1609             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1610         }
1611     }
1612
1613     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1614         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1615             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
1616                     collectorActor);
1617             setPersistence(false);
1618         }
1619
1620         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1621             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1622         }
1623     }
1624 }