Implement change to voting with no leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
43 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
45 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
46 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
47 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
48 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
49 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
50 import org.opendaylight.controller.cluster.raft.messages.AddServer;
51 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
52 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
53 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
54 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
56 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
57 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
58 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
59 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
60 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
61 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.duration.FiniteDuration;
70
71 /**
72  * Unit tests for RaftActorServerConfigurationSupport.
73  *
74  * @author Thomas Pantelis
75  */
76 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
77     static final String LEADER_ID = "leader";
78     static final String FOLLOWER_ID = "follower";
79     static final String FOLLOWER_ID2 = "follower2";
80     static final String NEW_SERVER_ID = "new-server";
81     static final String NEW_SERVER_ID2 = "new-server2";
82     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
83     private static final boolean NO_PERSISTENCE = false;
84     private static final boolean PERSISTENT = true;
85
86     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
87
88     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
89             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
90             actorFactory.generateActorId(FOLLOWER_ID));
91
92     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
93     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
94     private RaftActorContext newFollowerActorContext;
95
96     private final JavaTestKit testKit = new JavaTestKit(getSystem());
97
98     @Before
99     public void setup() {
100         InMemoryJournal.clear();
101         InMemorySnapshotStore.clear();
102     }
103
104     private void setupNewFollower() {
105         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
106
107         newFollowerCollectorActor = actorFactory.createTestActor(
108                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
109                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
110         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
111                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
112                 actorFactory.generateActorId(NEW_SERVER_ID));
113
114         try {
115             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
116         } catch (Exception e) {
117             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
118         }
119     }
120
121     private static DefaultConfigParamsImpl newFollowerConfigParams() {
122         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
123         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
124         configParams.setElectionTimeoutFactor(100000);
125         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
126         return configParams;
127     }
128
129     @After
130     public void tearDown() throws Exception {
131         actorFactory.close();
132     }
133
134     @Test
135     public void testAddServerWithExistingFollower() throws Exception {
136         LOG.info("testAddServerWithExistingFollower starting");
137         setupNewFollower();
138         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
139         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
140                 0, 3, 1).build());
141         followerActorContext.setCommitIndex(2);
142         followerActorContext.setLastApplied(2);
143
144         Follower follower = new Follower(followerActorContext);
145         followerActor.underlyingActor().setBehavior(follower);
146
147         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
148                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
149                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
150                 actorFactory.generateActorId(LEADER_ID));
151
152         // Expect initial heartbeat from the leader.
153         expectFirstMatching(followerActor, AppendEntries.class);
154         clearMessages(followerActor);
155
156         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
157         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
158
159         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
160
161         // Leader should install snapshot - capture and verify ApplySnapshot contents
162
163         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
164         @SuppressWarnings("unchecked")
165         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
166         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
167
168         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
169         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
170         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
171
172         // Verify ServerConfigurationPayload entry in leader's log
173
174         expectFirstMatching(leaderCollectorActor, ApplyState.class);
175         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
176         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
177         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
178         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
179         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
180                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
181
182         // Verify ServerConfigurationPayload entry in both followers
183
184         expectFirstMatching(followerActor, ApplyState.class);
185         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
186         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
187                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
188
189         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
190         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
191         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
192                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
193
194         // Verify new server config was applied in both followers
195
196         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
197
198         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
199
200         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
201         expectFirstMatching(followerActor, ApplyState.class);
202
203         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
204         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
205         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
206         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
207
208         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
209         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
210         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
211         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
212         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
213         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
214
215         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
216         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
217         logEntry = persistedLogEntries.get(0);
218         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
219         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
220         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
221                 logEntry.getData().getClass());
222
223         LOG.info("testAddServerWithExistingFollower ending");
224     }
225
226     @Test
227     public void testAddServerWithNoExistingFollower() throws Exception {
228         LOG.info("testAddServerWithNoExistingFollower starting");
229
230         setupNewFollower();
231         RaftActorContext initialActorContext = new MockRaftActorContext();
232         initialActorContext.setCommitIndex(1);
233         initialActorContext.setLastApplied(1);
234         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
235                 0, 2, 1).build());
236
237         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
238                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
239                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
240                 actorFactory.generateActorId(LEADER_ID));
241
242         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
243         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
244
245         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
246
247         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
248
249         // Leader should install snapshot - capture and verify ApplySnapshot contents
250
251         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
252         @SuppressWarnings("unchecked")
253         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
254         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
255
256         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
257         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
258         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
259
260         // Verify ServerConfigurationPayload entry in leader's log
261
262         expectFirstMatching(leaderCollectorActor, ApplyState.class);
263         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
264         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
265         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
266         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
267                 votingServer(NEW_SERVER_ID));
268
269         // Verify ServerConfigurationPayload entry in the new follower
270
271         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
272         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
273         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
274                 votingServer(NEW_SERVER_ID));
275
276         // Verify new server config was applied in the new follower
277
278         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
279
280         LOG.info("testAddServerWithNoExistingFollower ending");
281     }
282
283     @Test
284     public void testAddServersAsNonVoting() throws Exception {
285         LOG.info("testAddServersAsNonVoting starting");
286
287         setupNewFollower();
288         RaftActorContext initialActorContext = new MockRaftActorContext();
289
290         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
291                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
292                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
293                 actorFactory.generateActorId(LEADER_ID));
294
295         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
296         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
297
298         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
299
300         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
301
302         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
303         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
304         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
305
306         // Verify ServerConfigurationPayload entry in leader's log
307
308         expectFirstMatching(leaderCollectorActor, ApplyState.class);
309
310         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
311         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
312         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
313         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
314                 nonVotingServer(NEW_SERVER_ID));
315
316         // Verify ServerConfigurationPayload entry in the new follower
317
318         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
319         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
320         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
321                 nonVotingServer(NEW_SERVER_ID));
322
323         // Verify new server config was applied in the new follower
324
325         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
326
327         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
328
329         // Add another non-voting server.
330
331         clearMessages(leaderCollectorActor);
332
333         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
334         Follower newFollower2 = new Follower(follower2ActorContext);
335         followerActor.underlyingActor().setBehavior(newFollower2);
336
337         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
338
339         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
340         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
341         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
342
343         expectFirstMatching(leaderCollectorActor, ApplyState.class);
344         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
345         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
346         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
347         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
348                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
349
350         LOG.info("testAddServersAsNonVoting ending");
351     }
352
353     @Test
354     public void testAddServerWithOperationInProgress() throws Exception {
355         LOG.info("testAddServerWithOperationInProgress starting");
356
357         setupNewFollower();
358         RaftActorContext initialActorContext = new MockRaftActorContext();
359
360         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
361                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
362                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
363                 actorFactory.generateActorId(LEADER_ID));
364
365         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
366         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
367
368         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
369
370         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
371         Follower newFollower2 = new Follower(follower2ActorContext);
372         followerActor.underlyingActor().setBehavior(newFollower2);
373
374         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
375         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
376
377         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
378
379         // Wait for leader's install snapshot and capture it
380
381         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
382
383         // Send a second AddServer - should get queued
384         JavaTestKit testKit2 = new JavaTestKit(getSystem());
385         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
386
387         // Continue the first AddServer
388         newFollowerRaftActorInstance.setDropMessageOfType(null);
389         newFollowerRaftActor.tell(installSnapshot, leaderActor);
390
391         // Verify both complete successfully
392         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
393         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
394
395         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
396         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
397
398         // Verify ServerConfigurationPayload entries in leader's log
399
400         expectMatching(leaderCollectorActor, ApplyState.class, 2);
401         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
402         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
403         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
404         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
405                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
406
407         // Verify ServerConfigurationPayload entry in the new follower
408
409         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
410         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
411                newFollowerActorContext.getPeerIds());
412
413         LOG.info("testAddServerWithOperationInProgress ending");
414     }
415
416     @Test
417     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
418         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
419
420         setupNewFollower();
421         RaftActorContext initialActorContext = new MockRaftActorContext();
422
423         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
424                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
425                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
426                 actorFactory.generateActorId(LEADER_ID));
427
428         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
429         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
430
431         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
432
433         // Drop commit message for now to delay snapshot completion
434         leaderRaftActor.setDropMessageOfType(String.class);
435
436         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
437
438         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
439
440         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
441
442         leaderRaftActor.setDropMessageOfType(null);
443         leaderActor.tell(commitMsg, leaderActor);
444
445         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
446         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
447         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
448
449         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
450
451         // Verify ServerConfigurationPayload entry in leader's log
452
453         expectFirstMatching(leaderCollectorActor, ApplyState.class);
454         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
455         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
456         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
457         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
458                 votingServer(NEW_SERVER_ID));
459
460         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
461     }
462
463     @Test
464     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
465         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
466
467         setupNewFollower();
468         RaftActorContext initialActorContext = new MockRaftActorContext();
469
470         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
471                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
472                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
473                 actorFactory.generateActorId(LEADER_ID));
474
475         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
476         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
477
478         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
479
480         // Drop commit message so the snapshot doesn't complete.
481         leaderRaftActor.setDropMessageOfType(String.class);
482
483         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
484
485         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
486
487         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
488         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
489
490         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
491
492         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
493     }
494
495     @Test
496     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
497         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
498
499         setupNewFollower();
500         RaftActorContext initialActorContext = new MockRaftActorContext();
501
502         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
503                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
504                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
505                 actorFactory.generateActorId(LEADER_ID));
506
507         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
508         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
509         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
510
511         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
512
513         // Drop the commit message so the snapshot doesn't complete yet.
514         leaderRaftActor.setDropMessageOfType(String.class);
515
516         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
517
518         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
519
520         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
521
522         // Change the leader behavior to follower
523         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
524
525         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
526         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
527         // isCapturing assertion below.
528         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
529
530         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
531         leaderActor.tell(commitMsg, leaderActor);
532
533         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
534
535         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
536         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
537
538         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
539         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
540
541         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
542     }
543
544     @Test
545     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
546         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
547
548         setupNewFollower();
549         RaftActorContext initialActorContext = new MockRaftActorContext();
550
551         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
552                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
553                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
554                 actorFactory.generateActorId(LEADER_ID));
555
556         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
557         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
558
559         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
560
561         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
562
563         // Drop the UnInitializedFollowerSnapshotReply to delay it.
564         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
565
566         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
567
568         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
569                 UnInitializedFollowerSnapshotReply.class);
570
571         // Prevent election timeout when the leader switches to follower
572         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
573
574         // Change the leader behavior to follower
575         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
576
577         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
578         leaderRaftActor.setDropMessageOfType(null);
579         leaderActor.tell(snapshotReply, leaderActor);
580
581         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
582         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
583
584         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
585
586         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
587     }
588
589     @Test
590     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
591         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
592
593         setupNewFollower();
594         RaftActorContext initialActorContext = new MockRaftActorContext();
595
596         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
597                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
598                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
599                 actorFactory.generateActorId(LEADER_ID));
600
601         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
602         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
603         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
604
605         // Drop the InstallSnapshot message so it times out
606         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
607
608         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
609
610         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
611
612         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
613         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
614
615         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
616         assertEquals("Leader followers size", 0,
617                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
618
619         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
620     }
621
622     @Test
623     public void testAddServerWithNoLeader() {
624         LOG.info("testAddServerWithNoLeader starting");
625
626         setupNewFollower();
627         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
628         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
629
630         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
631                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
632                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
633                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
634                 actorFactory.generateActorId(LEADER_ID));
635         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
636
637         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
638         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
639         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
640
641         LOG.info("testAddServerWithNoLeader ending");
642     }
643
644     @Test
645     public void testAddServerWithNoConsensusReached() {
646         LOG.info("testAddServerWithNoConsensusReached starting");
647
648         setupNewFollower();
649         RaftActorContext initialActorContext = new MockRaftActorContext();
650
651         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
652                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
653                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
654                 actorFactory.generateActorId(LEADER_ID));
655
656         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
657         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
658
659         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
660
661         // Drop UnInitializedFollowerSnapshotReply initially
662         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
663
664         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
665         TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
666                 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
667
668         // Drop AppendEntries to the new follower so consensus isn't reached
669         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
670
671         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
672
673         // Capture the UnInitializedFollowerSnapshotReply
674         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
675
676         // Send the UnInitializedFollowerSnapshotReply to resume the first request
677         leaderRaftActor.setDropMessageOfType(null);
678         leaderActor.tell(snapshotReply, leaderActor);
679
680         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
681
682         // Send a second AddServer
683         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
684
685         // The first AddServer should succeed with OK even though consensus wasn't reached
686         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
687         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
688         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
689
690         // Verify ServerConfigurationPayload entry in leader's log
691         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
692                 votingServer(NEW_SERVER_ID));
693
694         // The second AddServer should fail since consensus wasn't reached for the first
695         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
696         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
697
698         // Re-send the second AddServer - should also fail
699         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
700         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
701         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
702
703         LOG.info("testAddServerWithNoConsensusReached ending");
704     }
705
706     @Test
707     public void testAddServerWithExistingServer() {
708         LOG.info("testAddServerWithExistingServer starting");
709
710         RaftActorContext initialActorContext = new MockRaftActorContext();
711
712         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
713                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
714                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
715                 actorFactory.generateActorId(LEADER_ID));
716
717         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
718
719         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
720         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
721
722         LOG.info("testAddServerWithExistingServer ending");
723     }
724
725     @Test
726     public void testAddServerForwardedToLeader() {
727         LOG.info("testAddServerForwardedToLeader starting");
728
729         setupNewFollower();
730         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
731         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
732
733         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
734                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
735                 actorFactory.generateActorId(LEADER_ID));
736
737         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
738                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
739                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
740                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
741                 actorFactory.generateActorId(FOLLOWER_ID));
742         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
743
744         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
745                 -1, -1, (short)0), leaderActor);
746
747         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
748         expectFirstMatching(leaderActor, AddServer.class);
749
750         LOG.info("testAddServerForwardedToLeader ending");
751     }
752
753     @Test
754     public void testOnApplyState() {
755         LOG.info("testOnApplyState starting");
756
757         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
758         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
759         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
760                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
761                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
762                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
763                 actorFactory.generateActorId(LEADER_ID));
764
765         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
766
767         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
768                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
769         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
770         assertEquals("Message handled", true, handled);
771
772         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
773                 new MockRaftActorContext.MockPayload("1"));
774         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
775         assertEquals("Message handled", false, handled);
776
777         LOG.info("testOnApplyState ending");
778     }
779
780     @Test
781     public void testRemoveServerWithNoLeader() {
782         LOG.info("testRemoveServerWithNoLeader starting");
783
784         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
785         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
786
787         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
788                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
789                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
790                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
791                 actorFactory.generateActorId(LEADER_ID));
792         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
793
794         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
795         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
796         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
797
798         LOG.info("testRemoveServerWithNoLeader ending");
799     }
800
801     @Test
802     public void testRemoveServerNonExistentServer() {
803         LOG.info("testRemoveServerNonExistentServer starting");
804
805         RaftActorContext initialActorContext = new MockRaftActorContext();
806
807         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
808                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
809                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
810                 actorFactory.generateActorId(LEADER_ID));
811
812         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
813         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
814         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
815
816         LOG.info("testRemoveServerNonExistentServer ending");
817     }
818
819     @Test
820     public void testRemoveServerForwardToLeader() {
821         LOG.info("testRemoveServerForwardToLeader starting");
822
823         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
824         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
825
826         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
827                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
828                 actorFactory.generateActorId(LEADER_ID));
829
830         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
831                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
832                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
833                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
834                 actorFactory.generateActorId(FOLLOWER_ID));
835         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
836
837         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
838                 -1, -1, (short)0), leaderActor);
839
840         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
841         expectFirstMatching(leaderActor, RemoveServer.class);
842
843         LOG.info("testRemoveServerForwardToLeader ending");
844     }
845
846     @Test
847     public void testRemoveServer() {
848         LOG.info("testRemoveServer starting");
849
850         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
851         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
852         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
853
854         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
855         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
856         RaftActorContext initialActorContext = new MockRaftActorContext();
857
858         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
859                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
860                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
861                 actorFactory.generateActorId(LEADER_ID));
862
863         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
864
865         TestActorRef<MessageCollectorActor> collector =
866                 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
867                         actorFactory.generateActorId("collector"));
868         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
869                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
870                         configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
871                 followerActorId);
872
873         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
874         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
875         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
876
877         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
878         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
879         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
880
881         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
882         assertTrue("Expected Leader", currentBehavior instanceof Leader);
883         assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
884
885         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
886
887         LOG.info("testRemoveServer ending");
888     }
889
890     @Test
891     public void testRemoveServerLeader() {
892         LOG.info("testRemoveServerLeader starting");
893
894         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
895         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
896         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
897
898         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
899         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
900         RaftActorContext initialActorContext = new MockRaftActorContext();
901
902         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
903                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
904                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
905                 actorFactory.generateActorId(LEADER_ID));
906
907         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
908
909         TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
910                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
911         actorFactory.createTestActor(
912                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
913                         configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
914                 followerActorId);
915
916         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
917         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
918         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
919
920         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
921         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
922         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
923                 votingServer(FOLLOWER_ID));
924
925         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
926
927         LOG.info("testRemoveServerLeader ending");
928     }
929
930     @Test
931     public void testRemoveServerLeaderWithNoFollowers() {
932         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
933
934         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
935                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
936                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
937                 actorFactory.generateActorId(LEADER_ID));
938
939         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
940         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
941         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
942
943         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
944     }
945
946     @Test
947     public void testChangeServersVotingStatus() {
948         LOG.info("testChangeServersVotingStatus starting");
949
950         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
951         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
952         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
953
954         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
955         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
956         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
957         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
958
959         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
960                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
961                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
962                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
963         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
964
965         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
966                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
967                 actorFactory.generateActorId("collector"));
968         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
969                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
970                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
971                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
972
973         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
974                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
975                 actorFactory.generateActorId("collector"));
976         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
977                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
978                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
979                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
980
981         // Send first ChangeServersVotingStatus message
982
983         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
984                 testKit.getRef());
985         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
986         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
987
988         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
989         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
990         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
991                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
992
993         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
994         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
995                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
996
997         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
998         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
999                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1000
1001         MessageCollectorActor.clearMessages(leaderCollector);
1002         MessageCollectorActor.clearMessages(follower1Collector);
1003         MessageCollectorActor.clearMessages(follower2Collector);
1004
1005         // Send second ChangeServersVotingStatus message
1006
1007         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1008         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1009         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1010
1011         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1012         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1013                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1014
1015         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1016         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1017                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1018
1019         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1020         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1021                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1022
1023         LOG.info("testChangeServersVotingStatus ending");
1024     }
1025
1026     @Test
1027     public void testChangeLeaderToNonVoting() {
1028         LOG.info("testChangeLeaderToNonVoting starting");
1029
1030         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1031         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1032
1033         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1034         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1035         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1036         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1037
1038         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1039                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1040                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1041                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1042         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1043
1044         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1045                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1046                 actorFactory.generateActorId("collector"));
1047         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1048                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1049                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1050                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1051
1052         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1053                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1054                 actorFactory.generateActorId("collector"));
1055         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1056                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1057                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1058                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1059
1060         // Send ChangeServersVotingStatus message
1061
1062         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1063         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1064         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1065
1066         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1067         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1068                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1069
1070         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1071         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1072                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1073
1074         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1075         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1076                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1077
1078         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1079         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1080
1081         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1082
1083         LOG.info("testChangeLeaderToNonVoting ending");
1084     }
1085
1086     @Test
1087     public void testChangeToVotingWithNoLeader() {
1088         LOG.info("testChangeToVotingWithNoLeader starting");
1089
1090         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1091         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1092
1093         final String node1ID = "node1";
1094         final String node2ID = "node2";
1095
1096         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1097         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1098         // actors created).
1099
1100         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1101                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1102                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1103         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1104
1105         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1106         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1107         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1108         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1109
1110         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1111                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1112                 actorFactory.generateActorId("collector"));
1113         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1114                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1115                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1116         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1117
1118         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1119                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1120                 actorFactory.generateActorId("collector"));
1121         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1122                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1123                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1124         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1125
1126         // Wait for snapshot after recovery
1127         MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1128
1129         // Verify the intended server config was loaded and applied.
1130         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1131                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1132                 votingServer("downNode2"));
1133         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1134         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1135         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1136
1137         MessageCollectorActor.expectFirstMatching(node2Collector, SnapshotComplete.class);
1138         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1139
1140         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1141         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1142         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1143         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1144
1145         // First send the message such that node1 has no peer address for node2 - should fail.
1146
1147         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1148                 node2ID, true, "downNode1", false, "downNode2", false));
1149         node1RaftActorRef.tell(changeServers, testKit.getRef());
1150         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1151         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1152
1153         // Update node2's peer address and send the message again
1154
1155         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1156
1157         node1RaftActorRef.tell(changeServers, testKit.getRef());
1158         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1159         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1160
1161         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1162         assertEquals("getToIndex", 1, apply.getToIndex());
1163         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1164                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1165                 nonVotingServer("downNode2"));
1166         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1167         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1168
1169         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1170         assertEquals("getToIndex", 1, apply.getToIndex());
1171         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1172                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1173                 nonVotingServer("downNode2"));
1174         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1175         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1176
1177         LOG.info("testChangeToVotingWithNoLeader ending");
1178     }
1179
1180     @Test
1181     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1182         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1183
1184         final String node1ID = "node1";
1185         final String node2ID = "node2";
1186
1187         PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
1188             @Override
1189             public String resolve(String peerId) {
1190                 return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1191                     peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1192             }
1193         };
1194
1195         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1196                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1197         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1198
1199         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1200         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1201         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1202         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1203
1204         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1205         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1206         configParams1.setElectionTimeoutFactor(1);
1207         configParams1.setPeerAddressResolver(peerAddressResolver);
1208         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1209                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1210                 actorFactory.generateActorId("collector"));
1211         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1212                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1213                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1214         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1215
1216         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1217         configParams2.setElectionTimeoutFactor(1000000);
1218         configParams2.setPeerAddressResolver(peerAddressResolver);
1219         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1220                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1221                 actorFactory.generateActorId("collector"));
1222         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1223                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1224                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1225         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1226
1227         // Wait for snapshot after recovery
1228         MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1229
1230         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1231         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1232         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1233         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1234         // node2 was previously voting.
1235
1236         node2RaftActor.setDropMessageOfType(RequestVote.class);
1237
1238         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1239         node1RaftActorRef.tell(changeServers, testKit.getRef());
1240         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1241         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1242
1243         assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1244                 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1245         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1246
1247         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1248     }
1249
1250     @Test
1251     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1252         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1253
1254         final String node1ID = "node1";
1255         final String node2ID = "node2";
1256
1257         PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
1258             @Override
1259             public String resolve(String peerId) {
1260                 return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1261                     peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1262             }
1263         };
1264
1265         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1266         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1267         configParams.setElectionTimeoutFactor(3);
1268         configParams.setPeerAddressResolver(peerAddressResolver);
1269
1270         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1271                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1272         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1273
1274         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1275         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1276         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1277         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1278         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1279                 new MockRaftActorContext.MockPayload("2")));
1280
1281         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1282                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1283                 actorFactory.generateActorId("collector"));
1284         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1285                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1286                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1287         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1288
1289         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1290                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1291                 actorFactory.generateActorId("collector"));
1292         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1293                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1294                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1295         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1296
1297         // Wait for snapshot after recovery
1298         MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1299
1300         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1301         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1302         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1303         // forward the request to node2.
1304
1305         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1306                 ImmutableMap.of(node1ID, true, node2ID, true));
1307         node1RaftActorRef.tell(changeServers, testKit.getRef());
1308         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1309         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1310
1311         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1312         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1313                 votingServer(node1ID), votingServer(node2ID));
1314         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1315
1316         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1317         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1318                 votingServer(node1ID), votingServer(node2ID));
1319         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1320         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1321
1322         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1323     }
1324
1325     @Test
1326     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1327         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1328
1329         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1330         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1331         configParams.setElectionTimeoutFactor(100000);
1332
1333         final String node1ID = "node1";
1334         final String node2ID = "node2";
1335
1336         configParams.setPeerAddressResolver(new PeerAddressResolver() {
1337             @Override
1338             public String resolve(String peerId) {
1339                 return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1340                     peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1341             }
1342         });
1343
1344         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1345                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1346         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1347
1348         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1349         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1350         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1351         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1352
1353         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1354                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1355                 actorFactory.generateActorId("collector"));
1356         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1357                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1358                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1359         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1360
1361         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1362                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1363                 actorFactory.generateActorId("collector"));
1364         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1365                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1366                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1367         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1368
1369         // Wait for snapshot after recovery
1370         MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1371
1372         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1373         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1374         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1375         // request to node2 when node2 is elected.
1376
1377         node2RaftActor.setDropMessageOfType(RequestVote.class);
1378
1379         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1380                 node2ID, true));
1381         node1RaftActorRef.tell(changeServers, testKit.getRef());
1382
1383         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1384
1385         node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
1386
1387         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1388         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1389
1390         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1391         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1392                 votingServer(node1ID), votingServer(node2ID));
1393         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1394         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1395
1396         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1397         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1398                 votingServer(node1ID), votingServer(node2ID));
1399         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1400
1401         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1402     }
1403
1404     private void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1405         Stopwatch sw = Stopwatch.createStarted();
1406         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1407             for(RaftActor raftActor: raftActors) {
1408                 if(raftActor.getRaftState() == expState) {
1409                     return;
1410                 }
1411             }
1412         }
1413
1414         fail("None of the RaftActors have state " + expState);
1415     }
1416
1417     private static ServerInfo votingServer(String id) {
1418         return new ServerInfo(id, true);
1419     }
1420
1421     private static ServerInfo nonVotingServer(String id) {
1422         return new ServerInfo(id, false);
1423     }
1424
1425     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1426         return newCollectorActor(leaderRaftActor, LEADER_ID);
1427     }
1428
1429     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1430         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1431                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1432                 actorFactory.generateActorId(id + "Collector"));
1433         raftActor.setCollectorActor(collectorActor);
1434         return collectorActor;
1435     }
1436
1437     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1438         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1439         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1440         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1441         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1442     }
1443
1444     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1445         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1446         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1447         configParams.setElectionTimeoutFactor(100000);
1448         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1449         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1450         termInfo.update(1, LEADER_ID);
1451         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1452                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1453     }
1454
1455     static abstract class AbstractMockRaftActor extends MockRaftActor {
1456         private volatile TestActorRef<MessageCollectorActor> collectorActor;
1457         private volatile Class<?> dropMessageOfType;
1458
1459         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1460                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1461             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1462                     persistent(Optional.of(persistent)));
1463             this.collectorActor = collectorActor;
1464         }
1465
1466         void setDropMessageOfType(Class<?> dropMessageOfType) {
1467             this.dropMessageOfType = dropMessageOfType;
1468         }
1469
1470         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1471             this.collectorActor = collectorActor;
1472         }
1473
1474         @Override
1475         public void handleCommand(Object message) {
1476             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1477                 super.handleCommand(message);
1478             }
1479
1480             if(collectorActor != null) {
1481                 collectorActor.tell(message, getSender());
1482             }
1483         }
1484     }
1485
1486     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1487
1488         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1489                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1490             super(id, peerAddresses, config, persistent, collectorActor);
1491             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1492                 @Override
1493                 public void createSnapshot(ActorRef actorRef) {
1494                     actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1495                 }
1496
1497                 @Override
1498                 public void applySnapshot(byte[] snapshotBytes) {
1499                 }
1500             };
1501         }
1502
1503         public static Props props(final String id, final Map<String, String> peerAddresses,
1504                 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1505
1506             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1507                     persistent, collectorActor);
1508         }
1509
1510     }
1511
1512     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1513         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1514                 RaftActorContext fromContext) {
1515             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1516             setPersistence(false);
1517
1518             RaftActorContext context = getRaftActorContext();
1519             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1520                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1521                 getState().add(entry.getData());
1522                 context.getReplicatedLog().append(entry);
1523             }
1524
1525             context.setCommitIndex(fromContext.getCommitIndex());
1526             context.setLastApplied(fromContext.getLastApplied());
1527             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1528                     fromContext.getTermInformation().getVotedFor());
1529         }
1530
1531         @Override
1532         protected void initializeBehavior() {
1533             changeCurrentBehavior(new Leader(getRaftActorContext()));
1534             initializeBehaviorComplete.countDown();
1535         }
1536
1537         @Override
1538         public void createSnapshot(ActorRef actorRef) {
1539             try {
1540                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1541             } catch (Exception e) {
1542                 LOG.error("createSnapshot failed", e);
1543             }
1544         }
1545
1546         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1547             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1548             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1549             configParams.setElectionTimeoutFactor(10);
1550             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1551         }
1552     }
1553
1554     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1555         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1556             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1557             setPersistence(false);
1558         }
1559
1560         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1561             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1562         }
1563     }
1564 }