CDS: updateMinReplicaCount on RemoveServer
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17
18 import akka.actor.ActorRef;
19 import akka.actor.Props;
20 import akka.actor.UntypedActor;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.Maps;
28 import com.google.common.collect.Sets;
29 import com.google.common.io.ByteSource;
30 import java.io.OutputStream;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.SerializationUtils;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
46 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
47 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
48 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
49 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
50 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
51 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
52 import org.opendaylight.controller.cluster.raft.messages.AddServer;
53 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
56 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
58 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
60 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
61 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
62 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
63 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
64 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
65 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
68 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
70 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
71 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
72 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
73 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
74 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77 import scala.concurrent.duration.FiniteDuration;
78
79 /**
80  * Unit tests for RaftActorServerConfigurationSupport.
81  *
82  * @author Thomas Pantelis
83  */
84 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
85     static final String LEADER_ID = "leader";
86     static final String FOLLOWER_ID = "follower";
87     static final String FOLLOWER_ID2 = "follower2";
88     static final String NEW_SERVER_ID = "new-server";
89     static final String NEW_SERVER_ID2 = "new-server2";
90     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
91     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
92     private static final boolean NO_PERSISTENCE = false;
93     private static final boolean PERSISTENT = true;
94
95     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
96
97     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
98             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
99             actorFactory.generateActorId(FOLLOWER_ID));
100
101     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
102     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
103     private RaftActorContext newFollowerActorContext;
104
105     private final JavaTestKit testKit = new JavaTestKit(getSystem());
106
107     @Before
108     public void setup() {
109         InMemoryJournal.clear();
110         InMemorySnapshotStore.clear();
111     }
112
113     @SuppressWarnings("checkstyle:IllegalCatch")
114     private void setupNewFollower() {
115         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
116
117         newFollowerCollectorActor = actorFactory.createTestActor(
118                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
119                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
120         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
121                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
122                 actorFactory.generateActorId(NEW_SERVER_ID));
123
124         try {
125             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
126         } catch (Exception e) {
127             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
128         }
129     }
130
131     private static DefaultConfigParamsImpl newFollowerConfigParams() {
132         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
133         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
134         configParams.setElectionTimeoutFactor(100000);
135         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
136         return configParams;
137     }
138
139     @After
140     public void tearDown() throws Exception {
141         actorFactory.close();
142     }
143
144     @Test
145     public void testAddServerWithExistingFollower() throws Exception {
146         LOG.info("testAddServerWithExistingFollower starting");
147         setupNewFollower();
148         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
149         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
150                 0, 3, 1).build());
151         followerActorContext.setCommitIndex(2);
152         followerActorContext.setLastApplied(2);
153
154         Follower follower = new Follower(followerActorContext);
155         followerActor.underlyingActor().setBehavior(follower);
156         followerActorContext.setCurrentBehavior(follower);
157
158         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
159                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
160                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
161                 actorFactory.generateActorId(LEADER_ID));
162
163         // Expect initial heartbeat from the leader.
164         expectFirstMatching(followerActor, AppendEntries.class);
165         clearMessages(followerActor);
166
167         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
168         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
169
170         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
171
172         // Leader should install snapshot - capture and verify ApplySnapshot contents
173
174         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
175         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
176         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
177
178         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
179         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
180         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
181
182         // Verify ServerConfigurationPayload entry in leader's log
183
184         expectFirstMatching(leaderCollectorActor, ApplyState.class);
185         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
186         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
187         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
188         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
189         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
190                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
191
192         // Verify ServerConfigurationPayload entry in both followers
193
194         expectFirstMatching(followerActor, ApplyState.class);
195         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
196         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
197                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
198
199         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
200         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
201         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
202                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
203
204         // Verify new server config was applied in both followers
205
206         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
207
208         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
209                 newFollowerActorContext.getPeerIds());
210
211         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
212         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
213         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
214         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
215
216         assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
217                 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
218         assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
219                 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
220
221         assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
222                 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
223         assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
224                 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
225
226         LOG.info("testAddServerWithExistingFollower ending");
227     }
228
229     @Test
230     public void testAddServerWithNoExistingFollower() throws Exception {
231         LOG.info("testAddServerWithNoExistingFollower starting");
232
233         setupNewFollower();
234         RaftActorContext initialActorContext = new MockRaftActorContext();
235         initialActorContext.setCommitIndex(1);
236         initialActorContext.setLastApplied(1);
237         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
238                 0, 2, 1).build());
239
240         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
241                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
242                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
243                 actorFactory.generateActorId(LEADER_ID));
244
245         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
246         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
247
248         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
249
250         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
251
252         // Leader should install snapshot - capture and verify ApplySnapshot contents
253
254         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
255         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
256         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
257
258         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
259         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
260         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
261
262         // Verify ServerConfigurationPayload entry in leader's log
263
264         expectFirstMatching(leaderCollectorActor, ApplyState.class);
265         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
266         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
267         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
268         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
269                 votingServer(NEW_SERVER_ID));
270
271         // Verify ServerConfigurationPayload entry in the new follower
272
273         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
274         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
275         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
276                 votingServer(NEW_SERVER_ID));
277
278         // Verify new server config was applied in the new follower
279
280         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
281
282         LOG.info("testAddServerWithNoExistingFollower ending");
283     }
284
285     @Test
286     public void testAddServersAsNonVoting() throws Exception {
287         LOG.info("testAddServersAsNonVoting starting");
288
289         setupNewFollower();
290         RaftActorContext initialActorContext = new MockRaftActorContext();
291
292         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
293                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
294                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
295                 actorFactory.generateActorId(LEADER_ID));
296
297         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
298         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
299
300         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
301
302         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
303
304         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
305         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
306         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
307
308         // Verify ServerConfigurationPayload entry in leader's log
309
310         expectFirstMatching(leaderCollectorActor, ApplyState.class);
311
312         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
313         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
314         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
315         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
316                 nonVotingServer(NEW_SERVER_ID));
317
318         // Verify ServerConfigurationPayload entry in the new follower
319
320         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
321         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
322         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
323                 nonVotingServer(NEW_SERVER_ID));
324
325         // Verify new server config was applied in the new follower
326
327         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
328
329         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
330
331         // Add another non-voting server.
332
333         clearMessages(leaderCollectorActor);
334
335         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
336         Follower newFollower2 = new Follower(follower2ActorContext);
337         followerActor.underlyingActor().setBehavior(newFollower2);
338
339         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
340
341         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
342         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
343         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
344
345         expectFirstMatching(leaderCollectorActor, ApplyState.class);
346         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
347         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
348         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
349         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
350                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
351
352         LOG.info("testAddServersAsNonVoting ending");
353     }
354
355     @Test
356     public void testAddServerWithOperationInProgress() throws Exception {
357         LOG.info("testAddServerWithOperationInProgress starting");
358
359         setupNewFollower();
360         RaftActorContext initialActorContext = new MockRaftActorContext();
361
362         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
363                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
364                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
365                 actorFactory.generateActorId(LEADER_ID));
366
367         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
368         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
369
370         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
371
372         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
373         Follower newFollower2 = new Follower(follower2ActorContext);
374         followerActor.underlyingActor().setBehavior(newFollower2);
375
376         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
377         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
378
379         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
380
381         // Wait for leader's install snapshot and capture it
382
383         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
384
385         // Send a second AddServer - should get queued
386         JavaTestKit testKit2 = new JavaTestKit(getSystem());
387         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
388
389         // Continue the first AddServer
390         newFollowerRaftActorInstance.setDropMessageOfType(null);
391         newFollowerRaftActor.tell(installSnapshot, leaderActor);
392
393         // Verify both complete successfully
394         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
395         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
396
397         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
398         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
399
400         // Verify ServerConfigurationPayload entries in leader's log
401
402         expectMatching(leaderCollectorActor, ApplyState.class, 2);
403         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
404         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
405         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
406         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
407                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
408
409         // Verify ServerConfigurationPayload entry in the new follower
410
411         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
412         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
413                newFollowerActorContext.getPeerIds());
414
415         LOG.info("testAddServerWithOperationInProgress ending");
416     }
417
418     @Test
419     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
420         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
421
422         setupNewFollower();
423         RaftActorContext initialActorContext = new MockRaftActorContext();
424
425         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
426                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
427                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
428                 actorFactory.generateActorId(LEADER_ID));
429
430         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
431         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
432
433         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
434
435         // Drop commit message for now to delay snapshot completion
436         leaderRaftActor.setDropMessageOfType(String.class);
437
438         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
439
440         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
441
442         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
443
444         leaderRaftActor.setDropMessageOfType(null);
445         leaderActor.tell(commitMsg, leaderActor);
446
447         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
448         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
449         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
450
451         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
452
453         // Verify ServerConfigurationPayload entry in leader's log
454
455         expectFirstMatching(leaderCollectorActor, ApplyState.class);
456         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
457         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
458         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
459         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
460                 votingServer(NEW_SERVER_ID));
461
462         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
463     }
464
465     @Test
466     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
467         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
468
469         setupNewFollower();
470         RaftActorContext initialActorContext = new MockRaftActorContext();
471
472         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
473                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
474                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
475                 actorFactory.generateActorId(LEADER_ID));
476
477         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
478         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
479
480         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
481
482         // Drop commit message so the snapshot doesn't complete.
483         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
484
485         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
486
487         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
488
489         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
490         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
491
492         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
493
494         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
495     }
496
497     @Test
498     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
499         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
500
501         setupNewFollower();
502         RaftActorContext initialActorContext = new MockRaftActorContext();
503
504         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
505                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
506                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
507                 actorFactory.generateActorId(LEADER_ID));
508
509         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
510         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
511         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
512
513         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
514
515         // Drop the commit message so the snapshot doesn't complete yet.
516         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
517
518         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
519
520         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
521
522         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
523
524         // Change the leader behavior to follower
525         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
526
527         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
528         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
529         // isCapturing assertion below.
530         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
531
532         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
533         leaderActor.tell(commitMsg, leaderActor);
534
535         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
536
537         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
538         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
539
540         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
541         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
542
543         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
544     }
545
546     @Test
547     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
548         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
549
550         setupNewFollower();
551         RaftActorContext initialActorContext = new MockRaftActorContext();
552
553         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
554                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
555                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
556                 actorFactory.generateActorId(LEADER_ID));
557
558         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
559         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
560
561         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
562
563         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
564
565         // Drop the UnInitializedFollowerSnapshotReply to delay it.
566         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
567
568         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
569
570         final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
571                 UnInitializedFollowerSnapshotReply.class);
572
573         // Prevent election timeout when the leader switches to follower
574         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
575
576         // Change the leader behavior to follower
577         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
578
579         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
580         leaderRaftActor.setDropMessageOfType(null);
581         leaderActor.tell(snapshotReply, leaderActor);
582
583         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
584         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
585
586         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
587
588         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
589     }
590
591     @Test
592     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
593         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
594
595         setupNewFollower();
596         RaftActorContext initialActorContext = new MockRaftActorContext();
597
598         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
599                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
600                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
601                 actorFactory.generateActorId(LEADER_ID));
602
603         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
604         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
605         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
606
607         // Drop the InstallSnapshot message so it times out
608         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
609
610         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
611
612         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
613
614         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
615         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
616
617         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
618         assertEquals("Leader followers size", 0,
619                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
620
621         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
622     }
623
624     @Test
625     public void testAddServerWithNoLeader() {
626         LOG.info("testAddServerWithNoLeader starting");
627
628         setupNewFollower();
629         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
630         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
631
632         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
633                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
634                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
635                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
636                 actorFactory.generateActorId(LEADER_ID));
637         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
638
639         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
640                 testKit.getRef());
641         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
642         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
643
644         LOG.info("testAddServerWithNoLeader ending");
645     }
646
647     @Test
648     public void testAddServerWithNoConsensusReached() {
649         LOG.info("testAddServerWithNoConsensusReached starting");
650
651         setupNewFollower();
652         RaftActorContext initialActorContext = new MockRaftActorContext();
653
654         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
655                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
656                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
657                 actorFactory.generateActorId(LEADER_ID));
658
659         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
660         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
661
662         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
663
664         // Drop UnInitializedFollowerSnapshotReply initially
665         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
666
667         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
668         newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
669
670         // Drop AppendEntries to the new follower so consensus isn't reached
671         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
672
673         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
674
675         // Capture the UnInitializedFollowerSnapshotReply
676         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
677
678         // Send the UnInitializedFollowerSnapshotReply to resume the first request
679         leaderRaftActor.setDropMessageOfType(null);
680         leaderActor.tell(snapshotReply, leaderActor);
681
682         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
683
684         // Send a second AddServer
685         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
686
687         // The first AddServer should succeed with OK even though consensus wasn't reached
688         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
689         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
690         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
691
692         // Verify ServerConfigurationPayload entry in leader's log
693         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
694                 votingServer(NEW_SERVER_ID));
695
696         // The second AddServer should fail since consensus wasn't reached for the first
697         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
698         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
699
700         // Re-send the second AddServer - should also fail
701         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
702         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
703         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
704
705         LOG.info("testAddServerWithNoConsensusReached ending");
706     }
707
708     @Test
709     public void testAddServerWithExistingServer() {
710         LOG.info("testAddServerWithExistingServer starting");
711
712         RaftActorContext initialActorContext = new MockRaftActorContext();
713
714         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
715                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
716                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
717                 actorFactory.generateActorId(LEADER_ID));
718
719         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
720
721         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
722         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
723
724         LOG.info("testAddServerWithExistingServer ending");
725     }
726
727     @Test
728     public void testAddServerForwardedToLeader() {
729         LOG.info("testAddServerForwardedToLeader starting");
730
731         setupNewFollower();
732         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
733         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
734
735         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
736                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
737                 actorFactory.generateActorId(LEADER_ID));
738
739         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
740                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
741                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
742                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
743                 actorFactory.generateActorId(FOLLOWER_ID));
744         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
745
746         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
747                 -1, -1, (short)0), leaderActor);
748
749         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
750                 testKit.getRef());
751         expectFirstMatching(leaderActor, AddServer.class);
752
753         LOG.info("testAddServerForwardedToLeader ending");
754     }
755
756     @Test
757     public void testOnApplyState() {
758         LOG.info("testOnApplyState starting");
759
760         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
761         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
762         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
763                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
764                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
765                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
766                 actorFactory.generateActorId(LEADER_ID));
767
768         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
769                 noLeaderActor.underlyingActor());
770
771         ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
772                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
773         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
774         assertEquals("Message handled", true, handled);
775
776         ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
777                 new MockRaftActorContext.MockPayload("1"));
778         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
779         assertEquals("Message handled", false, handled);
780
781         LOG.info("testOnApplyState ending");
782     }
783
784     @Test
785     public void testRemoveServerWithNoLeader() {
786         LOG.info("testRemoveServerWithNoLeader starting");
787
788         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
789         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
790
791         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
792                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
793                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
794                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
795                 actorFactory.generateActorId(LEADER_ID));
796         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
797
798         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
799         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
800                 RemoveServerReply.class);
801         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
802
803         LOG.info("testRemoveServerWithNoLeader ending");
804     }
805
806     @Test
807     public void testRemoveServerNonExistentServer() {
808         LOG.info("testRemoveServerNonExistentServer starting");
809
810         RaftActorContext initialActorContext = new MockRaftActorContext();
811
812         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
813                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
814                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
815                 actorFactory.generateActorId(LEADER_ID));
816
817         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
818         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
819                 RemoveServerReply.class);
820         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
821
822         LOG.info("testRemoveServerNonExistentServer ending");
823     }
824
825     @Test
826     public void testRemoveServerForwardToLeader() {
827         LOG.info("testRemoveServerForwardToLeader starting");
828
829         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
830         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
831
832         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
833                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
834                 actorFactory.generateActorId(LEADER_ID));
835
836         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
837                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
838                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
839                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
840                 actorFactory.generateActorId(FOLLOWER_ID));
841         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
842
843         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
844                 -1, -1, (short)0), leaderActor);
845
846         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
847         expectFirstMatching(leaderActor, RemoveServer.class);
848
849         LOG.info("testRemoveServerForwardToLeader ending");
850     }
851
852     @Test
853     public void testRemoveServer() throws Exception {
854         LOG.info("testRemoveServer starting");
855
856         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
857         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
858         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
859
860         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
861         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
862         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
863         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
864         RaftActorContext initialActorContext = new MockRaftActorContext();
865
866         final String downNodeId = "downNode";
867         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
868                 ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
869                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
870                 actorFactory.generateActorId(LEADER_ID));
871
872         final TestActorRef<MessageCollectorActor> leaderCollector =
873                 newLeaderCollectorActor(leaderActor.underlyingActor());
874
875         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
876                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
877                         actorFactory.generateActorId("collector"));
878         final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
879                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
880                         FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
881                         follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
882
883         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
884                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
885                         actorFactory.generateActorId("collector"));
886         final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
887                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
888                         FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
889                         follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
890
891         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
892         follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
893         follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
894
895         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
896         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
897                 RemoveServerReply.class);
898         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
899
900         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
901         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
902         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
903                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
904
905         applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
906         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
907         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
908                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
909
910         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
911         assertTrue("Expected Leader", currentBehavior instanceof Leader);
912         assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
913
914         MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
915
916         LOG.info("testRemoveServer ending");
917     }
918
919     @Test
920     public void testRemoveServerLeader() {
921         LOG.info("testRemoveServerLeader starting");
922
923         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
924         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
925         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
926
927         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
928         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
929         RaftActorContext initialActorContext = new MockRaftActorContext();
930
931         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
932                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
933                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
934                 actorFactory.generateActorId(LEADER_ID));
935
936         final TestActorRef<MessageCollectorActor> leaderCollector =
937                 newLeaderCollectorActor(leaderActor.underlyingActor());
938
939         final TestActorRef<MessageCollectorActor> followerCollector =
940                 actorFactory.createTestActor(MessageCollectorActor.props()
941                 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
942         actorFactory.createTestActor(
943                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
944                         configParams, NO_PERSISTENCE, followerCollector)
945                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
946                 followerActorId);
947
948         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
949         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
950                 RemoveServerReply.class);
951         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
952
953         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
954         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
955         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
956                 votingServer(FOLLOWER_ID));
957
958         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
959
960         LOG.info("testRemoveServerLeader ending");
961     }
962
963     @Test
964     public void testRemoveServerLeaderWithNoFollowers() {
965         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
966
967         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
968                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
969                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
970                 actorFactory.generateActorId(LEADER_ID));
971
972         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
973         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
974                 RemoveServerReply.class);
975         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
976
977         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
978     }
979
980     @Test
981     public void testChangeServersVotingStatus() {
982         LOG.info("testChangeServersVotingStatus starting");
983
984         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
985         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
986         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
987
988         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
989         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
990         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
991         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
992
993         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
994                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
995                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
996                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
997         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
998
999         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1000                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1001                 actorFactory.generateActorId("collector"));
1002         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1003                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1004                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1005                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1006
1007         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1008                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1009                 actorFactory.generateActorId("collector"));
1010         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1011                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1012                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1013                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1014
1015         // Send first ChangeServersVotingStatus message
1016
1017         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
1018                 testKit.getRef());
1019         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1020         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1021
1022         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1023         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1024         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1025                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1026
1027         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1028         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1029                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1030                 nonVotingServer(FOLLOWER_ID2));
1031
1032         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1033         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1034                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1035                 nonVotingServer(FOLLOWER_ID2));
1036
1037         MessageCollectorActor.clearMessages(leaderCollector);
1038         MessageCollectorActor.clearMessages(follower1Collector);
1039         MessageCollectorActor.clearMessages(follower2Collector);
1040
1041         // Send second ChangeServersVotingStatus message
1042
1043         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1044         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1045         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1046
1047         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1048         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1049                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1050
1051         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1052         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1053                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1054
1055         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1056         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1057                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1058
1059         LOG.info("testChangeServersVotingStatus ending");
1060     }
1061
1062     @Test
1063     public void testChangeLeaderToNonVoting() {
1064         LOG.info("testChangeLeaderToNonVoting starting");
1065
1066         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1067         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1068
1069         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1070         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1071         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1072         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1073
1074         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1075                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1076                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1077                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1078         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1079
1080         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1081                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1082                 actorFactory.generateActorId("collector"));
1083         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1084                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1085                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1086                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1087
1088         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1089                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1090                 actorFactory.generateActorId("collector"));
1091         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1092                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1093                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1094                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1095
1096         // Send ChangeServersVotingStatus message
1097
1098         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1099         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1100         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1101
1102         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1103         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1104                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1105
1106         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1107         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1108                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1109
1110         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1111         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1112                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1113
1114         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1115         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1116
1117         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1118
1119         LOG.info("testChangeLeaderToNonVoting ending");
1120     }
1121
1122     @Test
1123     public void testChangeLeaderToNonVotingInSingleNode() {
1124         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1125
1126         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1127                 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1128                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1129
1130         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1131         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1132         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1133
1134         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1135     }
1136
1137     @Test
1138     public void testChangeToVotingWithNoLeader() {
1139         LOG.info("testChangeToVotingWithNoLeader starting");
1140
1141         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1142         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1143         configParams.setElectionTimeoutFactor(5);
1144
1145         final String node1ID = "node1";
1146         final String node2ID = "node2";
1147
1148         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1149         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1150         // actors created).
1151
1152         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1153                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1154                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1155         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1156
1157         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1158         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1159         InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1160         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1161         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1162         InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1163
1164         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1165                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1166                 actorFactory.generateActorId("collector"));
1167         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1168                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1169                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1170         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1171
1172         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1173                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1174                 actorFactory.generateActorId("collector"));
1175         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1176                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1177                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1178         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1179
1180         node1RaftActor.waitForInitializeBehaviorComplete();
1181         node2RaftActor.waitForInitializeBehaviorComplete();
1182
1183         // Verify the intended server config was loaded and applied.
1184         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1185                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1186                 votingServer("downNode2"));
1187         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1188         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1189         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1190
1191         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1192                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1193                 votingServer("downNode2"));
1194         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1195
1196         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1197         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1198         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1199         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1200
1201         // First send the message such that node1 has no peer address for node2 - should fail.
1202
1203         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1204                 node2ID, true, "downNode1", false, "downNode2", false));
1205         node1RaftActorRef.tell(changeServers, testKit.getRef());
1206         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1207         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1208
1209         // Send an AppendEntries so node1 has a leaderId
1210
1211         MessageCollectorActor.clearMessages(node1Collector);
1212
1213         long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1214         node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1215                 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1216
1217         // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
1218         // ChangeServersVotingStatus message, it will try to elect a leader.
1219
1220         MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
1221
1222         // Update node2's peer address and send the message again
1223
1224         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1225
1226         node1RaftActorRef.tell(changeServers, testKit.getRef());
1227         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1228         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1229
1230         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1231                 ApplyJournalEntries.class);
1232         assertEquals("getToIndex", 1, apply.getToIndex());
1233         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1234                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1235                 nonVotingServer("downNode2"));
1236         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1237         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1238
1239         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1240         assertEquals("getToIndex", 1, apply.getToIndex());
1241         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1242                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1243                 nonVotingServer("downNode2"));
1244         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1245         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1246
1247         LOG.info("testChangeToVotingWithNoLeader ending");
1248     }
1249
1250     @Test
1251     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1252         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1253
1254         final String node1ID = "node1";
1255         final String node2ID = "node2";
1256
1257         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1258                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1259                         ? actorFactory.createTestActorPath(node2ID) : null;
1260
1261         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1262                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1263         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1264
1265         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1266         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1267         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1268         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1269
1270         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1271         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1272         configParams1.setElectionTimeoutFactor(1);
1273         configParams1.setPeerAddressResolver(peerAddressResolver);
1274         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1275                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1276                 actorFactory.generateActorId("collector"));
1277         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1278                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1279                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1280         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1281
1282         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1283         configParams2.setElectionTimeoutFactor(1000000);
1284         configParams2.setPeerAddressResolver(peerAddressResolver);
1285         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1286                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1287                 actorFactory.generateActorId("collector"));
1288         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1289                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1290                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1291         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1292
1293         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1294         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1295         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1296         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1297         // node2 was previously voting.
1298
1299         node2RaftActor.setDropMessageOfType(RequestVote.class);
1300
1301         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1302         node1RaftActorRef.tell(changeServers, testKit.getRef());
1303         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1304         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1305
1306         assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1307                 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1308         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1309
1310         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1311     }
1312
1313     @Test
1314     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1315         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1316
1317         final String node1ID = "node1";
1318         final String node2ID = "node2";
1319
1320         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1321                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1322                         ? actorFactory.createTestActorPath(node2ID) : null;
1323
1324         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1325         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1326         configParams.setElectionTimeoutFactor(3);
1327         configParams.setPeerAddressResolver(peerAddressResolver);
1328
1329         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1330                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1331         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1332
1333         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1334         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1335         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1336         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1337         InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1338                 new MockRaftActorContext.MockPayload("2")));
1339         InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1340
1341         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1342                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1343                 actorFactory.generateActorId("collector"));
1344         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1345                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1346                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1347         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1348
1349         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1350                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1351                 actorFactory.generateActorId("collector"));
1352         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1353                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1354                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1355         final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1356
1357         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1358         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1359         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1360         // forward the request to node2.
1361
1362         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1363                 ImmutableMap.of(node1ID, true, node2ID, true));
1364         node1RaftActorRef.tell(changeServers, testKit.getRef());
1365         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1366         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1367
1368         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1369         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1370                 votingServer(node1ID), votingServer(node2ID));
1371         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1372
1373         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1374         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1375                 votingServer(node1ID), votingServer(node2ID));
1376         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1377         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1378
1379         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1380     }
1381
1382     @Test
1383     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1384         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1385
1386         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1387         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1388         configParams.setElectionTimeoutFactor(100000);
1389
1390         final String node1ID = "node1";
1391         final String node2ID = "node2";
1392
1393         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1394                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1395                         ? actorFactory.createTestActorPath(node2ID) : null);
1396
1397         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1398                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1399         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1400
1401         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1402         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1403         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1404         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1405
1406         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1407                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1408                 actorFactory.generateActorId("collector"));
1409         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1410                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1411                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1412         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1413
1414         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1415                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1416                 actorFactory.generateActorId("collector"));
1417         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1418                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1419                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1420         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1421
1422         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1423         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1424         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1425         // request to node2 when node2 is elected.
1426
1427         node2RaftActor.setDropMessageOfType(RequestVote.class);
1428
1429         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1430                 node2ID, true));
1431         node1RaftActorRef.tell(changeServers, testKit.getRef());
1432
1433         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1434
1435         node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1436
1437         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1438         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1439
1440         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1441         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1442                 votingServer(node1ID), votingServer(node2ID));
1443         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1444         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1445
1446         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1447         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1448                 votingServer(node1ID), votingServer(node2ID));
1449         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1450
1451         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1452     }
1453
1454     private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1455         Stopwatch sw = Stopwatch.createStarted();
1456         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1457             for (RaftActor raftActor : raftActors) {
1458                 if (raftActor.getRaftState() == expState) {
1459                     return;
1460                 }
1461             }
1462         }
1463
1464         fail("None of the RaftActors have state " + expState);
1465     }
1466
1467     private static ServerInfo votingServer(String id) {
1468         return new ServerInfo(id, true);
1469     }
1470
1471     private static ServerInfo nonVotingServer(String id) {
1472         return new ServerInfo(id, false);
1473     }
1474
1475     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1476         return newCollectorActor(leaderRaftActor, LEADER_ID);
1477     }
1478
1479     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1480         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1481                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1482                 actorFactory.generateActorId(id + "Collector"));
1483         raftActor.setCollectorActor(collectorActor);
1484         return collectorActor;
1485     }
1486
1487     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1488         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1489         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1490         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1491         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1492     }
1493
1494     private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1495         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1496         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1497         configParams.setElectionTimeoutFactor(100000);
1498         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1499         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1500         termInfo.update(1, LEADER_ID);
1501         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1502                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1503                 noPersistence, applyState -> actor.tell(applyState, actor), LOG);
1504     }
1505
1506     abstract static class AbstractMockRaftActor extends MockRaftActor {
1507         private volatile TestActorRef<MessageCollectorActor> collectorActor;
1508         private volatile Class<?> dropMessageOfType;
1509
1510         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1511                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1512             super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1513                     .persistent(Optional.of(persistent)));
1514             this.collectorActor = collectorActor;
1515         }
1516
1517         void setDropMessageOfType(Class<?> dropMessageOfType) {
1518             this.dropMessageOfType = dropMessageOfType;
1519         }
1520
1521         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1522             this.collectorActor = collectorActor;
1523         }
1524
1525         @Override
1526         public void handleCommand(Object message) {
1527             if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1528                 super.handleCommand(message);
1529             }
1530
1531             if (collectorActor != null) {
1532                 collectorActor.tell(message, getSender());
1533             }
1534         }
1535     }
1536
1537     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1538
1539         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1540                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1541             super(id, peerAddresses, config, persistent, collectorActor);
1542             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1543                 @Override
1544                 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1545                     actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1546                 }
1547
1548                 @Override
1549                 public void applySnapshot(
1550                         org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1551                 }
1552
1553                 @Override
1554                 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1555                         ByteSource snapshotBytes) {
1556                     throw new UnsupportedOperationException();
1557                 }
1558             };
1559         }
1560
1561         public static Props props(final String id, final Map<String, String> peerAddresses,
1562                 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1563
1564             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1565                     persistent, collectorActor);
1566         }
1567
1568     }
1569
1570     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1571         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1572                 RaftActorContext fromContext) {
1573             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1574             setPersistence(false);
1575
1576             RaftActorContext context = getRaftActorContext();
1577             for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1578                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1579                 getState().add(entry.getData());
1580                 context.getReplicatedLog().append(entry);
1581             }
1582
1583             context.setCommitIndex(fromContext.getCommitIndex());
1584             context.setLastApplied(fromContext.getLastApplied());
1585             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1586                     fromContext.getTermInformation().getVotedFor());
1587         }
1588
1589         @Override
1590         protected void initializeBehavior() {
1591             changeCurrentBehavior(new Leader(getRaftActorContext()));
1592             initializeBehaviorComplete.countDown();
1593         }
1594
1595         @Override
1596         @SuppressWarnings("checkstyle:IllegalCatch")
1597         public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1598             MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
1599             if (installSnapshotStream.isPresent()) {
1600                 SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
1601             }
1602
1603             actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1604         }
1605
1606         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1607             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1608             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1609             configParams.setElectionTimeoutFactor(10);
1610             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1611         }
1612     }
1613
1614     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1615         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1616             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
1617                     collectorActor);
1618             setPersistence(false);
1619         }
1620
1621         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1622             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1623         }
1624     }
1625 }