Fix issues when persistence enabled
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
39 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
40 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
41 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
42 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
43 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
44 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
45 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
46 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
47 import org.opendaylight.controller.cluster.raft.messages.AddServer;
48 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
50 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
52 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
53 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
54 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
55 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
56 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
57 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
58 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
60 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
61 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
62 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
63 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
66 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
67 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.concurrent.duration.FiniteDuration;
71
72 /**
73  * Unit tests for RaftActorServerConfigurationSupport.
74  *
75  * @author Thomas Pantelis
76  */
77 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
78     static final String LEADER_ID = "leader";
79     static final String FOLLOWER_ID = "follower";
80     static final String FOLLOWER_ID2 = "follower2";
81     static final String NEW_SERVER_ID = "new-server";
82     static final String NEW_SERVER_ID2 = "new-server2";
83     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
84     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
85     private static final boolean NO_PERSISTENCE = false;
86     private static final boolean PERSISTENT = true;
87
88     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
89
90     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
92             actorFactory.generateActorId(FOLLOWER_ID));
93
94     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
95     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
96     private RaftActorContext newFollowerActorContext;
97
98     private final JavaTestKit testKit = new JavaTestKit(getSystem());
99
100     @Before
101     public void setup() {
102         InMemoryJournal.clear();
103         InMemorySnapshotStore.clear();
104     }
105
106     private void setupNewFollower() {
107         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
108
109         newFollowerCollectorActor = actorFactory.createTestActor(
110                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
111                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
112         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
113                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
114                 actorFactory.generateActorId(NEW_SERVER_ID));
115
116         try {
117             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
118         } catch (Exception e) {
119             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
120         }
121     }
122
123     private static DefaultConfigParamsImpl newFollowerConfigParams() {
124         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
125         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
126         configParams.setElectionTimeoutFactor(100000);
127         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
128         return configParams;
129     }
130
131     @After
132     public void tearDown() throws Exception {
133         actorFactory.close();
134     }
135
136     @Test
137     public void testAddServerWithExistingFollower() throws Exception {
138         LOG.info("testAddServerWithExistingFollower starting");
139         setupNewFollower();
140         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
141         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
142                 0, 3, 1).build());
143         followerActorContext.setCommitIndex(2);
144         followerActorContext.setLastApplied(2);
145
146         Follower follower = new Follower(followerActorContext);
147         followerActor.underlyingActor().setBehavior(follower);
148         followerActorContext.setCurrentBehavior(follower);
149
150         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
151                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
152                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
153                 actorFactory.generateActorId(LEADER_ID));
154
155         // Expect initial heartbeat from the leader.
156         expectFirstMatching(followerActor, AppendEntries.class);
157         clearMessages(followerActor);
158
159         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
160         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
161
162         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
163
164         // Leader should install snapshot - capture and verify ApplySnapshot contents
165
166         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
167         @SuppressWarnings("unchecked")
168         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
169         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
170
171         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
172         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
173         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
174
175         // Verify ServerConfigurationPayload entry in leader's log
176
177         expectFirstMatching(leaderCollectorActor, ApplyState.class);
178         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
179         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
180         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
181         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
182         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
183                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
184
185         // Verify ServerConfigurationPayload entry in both followers
186
187         expectFirstMatching(followerActor, ApplyState.class);
188         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
189         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
190                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
191
192         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
193         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
194         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
195                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
196
197         // Verify new server config was applied in both followers
198
199         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
200
201         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
202
203         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
204         expectFirstMatching(followerActor, ApplyState.class);
205
206         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
207         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
208         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
209         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
210
211         assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
212                 InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class).size());
213         assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
214                 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
215
216         assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
217                 InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class).size());
218         assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
219                 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
220
221         LOG.info("testAddServerWithExistingFollower ending");
222     }
223
224     @Test
225     public void testAddServerWithNoExistingFollower() throws Exception {
226         LOG.info("testAddServerWithNoExistingFollower starting");
227
228         setupNewFollower();
229         RaftActorContext initialActorContext = new MockRaftActorContext();
230         initialActorContext.setCommitIndex(1);
231         initialActorContext.setLastApplied(1);
232         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
233                 0, 2, 1).build());
234
235         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
236                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
237                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
238                 actorFactory.generateActorId(LEADER_ID));
239
240         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
241         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
242
243         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
244
245         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
246
247         // Leader should install snapshot - capture and verify ApplySnapshot contents
248
249         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
250         @SuppressWarnings("unchecked")
251         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
252         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
253
254         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
255         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
256         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
257
258         // Verify ServerConfigurationPayload entry in leader's log
259
260         expectFirstMatching(leaderCollectorActor, ApplyState.class);
261         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
262         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
263         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
264         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
265                 votingServer(NEW_SERVER_ID));
266
267         // Verify ServerConfigurationPayload entry in the new follower
268
269         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
270         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
271         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
272                 votingServer(NEW_SERVER_ID));
273
274         // Verify new server config was applied in the new follower
275
276         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
277
278         LOG.info("testAddServerWithNoExistingFollower ending");
279     }
280
281     @Test
282     public void testAddServersAsNonVoting() throws Exception {
283         LOG.info("testAddServersAsNonVoting starting");
284
285         setupNewFollower();
286         RaftActorContext initialActorContext = new MockRaftActorContext();
287
288         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
289                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
290                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
291                 actorFactory.generateActorId(LEADER_ID));
292
293         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
294         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
295
296         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
297
298         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
299
300         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
301         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
302         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
303
304         // Verify ServerConfigurationPayload entry in leader's log
305
306         expectFirstMatching(leaderCollectorActor, ApplyState.class);
307
308         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
309         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
310         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
311         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
312                 nonVotingServer(NEW_SERVER_ID));
313
314         // Verify ServerConfigurationPayload entry in the new follower
315
316         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
317         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
318         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
319                 nonVotingServer(NEW_SERVER_ID));
320
321         // Verify new server config was applied in the new follower
322
323         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
324
325         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
326
327         // Add another non-voting server.
328
329         clearMessages(leaderCollectorActor);
330
331         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
332         Follower newFollower2 = new Follower(follower2ActorContext);
333         followerActor.underlyingActor().setBehavior(newFollower2);
334
335         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
336
337         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
338         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
339         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
340
341         expectFirstMatching(leaderCollectorActor, ApplyState.class);
342         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
343         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
344         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
345         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
346                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
347
348         LOG.info("testAddServersAsNonVoting ending");
349     }
350
351     @Test
352     public void testAddServerWithOperationInProgress() throws Exception {
353         LOG.info("testAddServerWithOperationInProgress starting");
354
355         setupNewFollower();
356         RaftActorContext initialActorContext = new MockRaftActorContext();
357
358         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
359                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
360                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
361                 actorFactory.generateActorId(LEADER_ID));
362
363         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
364         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
365
366         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
367
368         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
369         Follower newFollower2 = new Follower(follower2ActorContext);
370         followerActor.underlyingActor().setBehavior(newFollower2);
371
372         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
373         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
374
375         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
376
377         // Wait for leader's install snapshot and capture it
378
379         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
380
381         // Send a second AddServer - should get queued
382         JavaTestKit testKit2 = new JavaTestKit(getSystem());
383         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
384
385         // Continue the first AddServer
386         newFollowerRaftActorInstance.setDropMessageOfType(null);
387         newFollowerRaftActor.tell(installSnapshot, leaderActor);
388
389         // Verify both complete successfully
390         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
391         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
392
393         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
394         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
395
396         // Verify ServerConfigurationPayload entries in leader's log
397
398         expectMatching(leaderCollectorActor, ApplyState.class, 2);
399         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
400         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
401         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
402         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
403                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
404
405         // Verify ServerConfigurationPayload entry in the new follower
406
407         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
408         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
409                newFollowerActorContext.getPeerIds());
410
411         LOG.info("testAddServerWithOperationInProgress ending");
412     }
413
414     @Test
415     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
416         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
417
418         setupNewFollower();
419         RaftActorContext initialActorContext = new MockRaftActorContext();
420
421         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
422                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
423                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
424                 actorFactory.generateActorId(LEADER_ID));
425
426         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
427         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
428
429         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
430
431         // Drop commit message for now to delay snapshot completion
432         leaderRaftActor.setDropMessageOfType(String.class);
433
434         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
435
436         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
437
438         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
439
440         leaderRaftActor.setDropMessageOfType(null);
441         leaderActor.tell(commitMsg, leaderActor);
442
443         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
444         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
445         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
446
447         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
448
449         // Verify ServerConfigurationPayload entry in leader's log
450
451         expectFirstMatching(leaderCollectorActor, ApplyState.class);
452         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
453         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
454         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
455         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
456                 votingServer(NEW_SERVER_ID));
457
458         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
459     }
460
461     @Test
462     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
463         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
464
465         setupNewFollower();
466         RaftActorContext initialActorContext = new MockRaftActorContext();
467
468         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
469                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
470                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
471                 actorFactory.generateActorId(LEADER_ID));
472
473         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
474         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
475
476         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
477
478         // Drop commit message so the snapshot doesn't complete.
479         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
480
481         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
482
483         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
484
485         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
486         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
487
488         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
489
490         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
491     }
492
493     @Test
494     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
495         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
496
497         setupNewFollower();
498         RaftActorContext initialActorContext = new MockRaftActorContext();
499
500         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
501                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
502                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
503                 actorFactory.generateActorId(LEADER_ID));
504
505         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
506         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
507         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
508
509         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
510
511         // Drop the commit message so the snapshot doesn't complete yet.
512         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
513
514         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
515
516         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
517
518         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
519
520         // Change the leader behavior to follower
521         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
522
523         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
524         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
525         // isCapturing assertion below.
526         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
527
528         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
529         leaderActor.tell(commitMsg, leaderActor);
530
531         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
532
533         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
534         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
535
536         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
537         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
538
539         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
540     }
541
542     @Test
543     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
544         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
545
546         setupNewFollower();
547         RaftActorContext initialActorContext = new MockRaftActorContext();
548
549         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
550                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
551                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
552                 actorFactory.generateActorId(LEADER_ID));
553
554         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
555         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
556
557         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
558
559         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
560
561         // Drop the UnInitializedFollowerSnapshotReply to delay it.
562         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
563
564         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
565
566         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
567                 UnInitializedFollowerSnapshotReply.class);
568
569         // Prevent election timeout when the leader switches to follower
570         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
571
572         // Change the leader behavior to follower
573         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
574
575         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
576         leaderRaftActor.setDropMessageOfType(null);
577         leaderActor.tell(snapshotReply, leaderActor);
578
579         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
580         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
581
582         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
583
584         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
585     }
586
587     @Test
588     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
589         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
590
591         setupNewFollower();
592         RaftActorContext initialActorContext = new MockRaftActorContext();
593
594         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
595                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
596                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
597                 actorFactory.generateActorId(LEADER_ID));
598
599         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
600         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
601         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
602
603         // Drop the InstallSnapshot message so it times out
604         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
605
606         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
607
608         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
609
610         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
611         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
612
613         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
614         assertEquals("Leader followers size", 0,
615                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
616
617         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
618     }
619
620     @Test
621     public void testAddServerWithNoLeader() {
622         LOG.info("testAddServerWithNoLeader starting");
623
624         setupNewFollower();
625         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
626         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
627
628         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
629                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
630                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
631                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
632                 actorFactory.generateActorId(LEADER_ID));
633         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
634
635         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
636         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
637         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
638
639         LOG.info("testAddServerWithNoLeader ending");
640     }
641
642     @Test
643     public void testAddServerWithNoConsensusReached() {
644         LOG.info("testAddServerWithNoConsensusReached starting");
645
646         setupNewFollower();
647         RaftActorContext initialActorContext = new MockRaftActorContext();
648
649         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
650                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
651                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
652                 actorFactory.generateActorId(LEADER_ID));
653
654         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
655         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
656
657         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
658
659         // Drop UnInitializedFollowerSnapshotReply initially
660         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
661
662         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
663         TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
664                 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
665
666         // Drop AppendEntries to the new follower so consensus isn't reached
667         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
668
669         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
670
671         // Capture the UnInitializedFollowerSnapshotReply
672         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
673
674         // Send the UnInitializedFollowerSnapshotReply to resume the first request
675         leaderRaftActor.setDropMessageOfType(null);
676         leaderActor.tell(snapshotReply, leaderActor);
677
678         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
679
680         // Send a second AddServer
681         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
682
683         // The first AddServer should succeed with OK even though consensus wasn't reached
684         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
685         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
686         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
687
688         // Verify ServerConfigurationPayload entry in leader's log
689         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
690                 votingServer(NEW_SERVER_ID));
691
692         // The second AddServer should fail since consensus wasn't reached for the first
693         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
694         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
695
696         // Re-send the second AddServer - should also fail
697         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
698         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
699         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
700
701         LOG.info("testAddServerWithNoConsensusReached ending");
702     }
703
704     @Test
705     public void testAddServerWithExistingServer() {
706         LOG.info("testAddServerWithExistingServer starting");
707
708         RaftActorContext initialActorContext = new MockRaftActorContext();
709
710         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
711                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
712                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
713                 actorFactory.generateActorId(LEADER_ID));
714
715         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
716
717         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
718         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
719
720         LOG.info("testAddServerWithExistingServer ending");
721     }
722
723     @Test
724     public void testAddServerForwardedToLeader() {
725         LOG.info("testAddServerForwardedToLeader starting");
726
727         setupNewFollower();
728         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
729         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
730
731         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
732                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
733                 actorFactory.generateActorId(LEADER_ID));
734
735         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
736                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
737                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
738                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
739                 actorFactory.generateActorId(FOLLOWER_ID));
740         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
741
742         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
743                 -1, -1, (short)0), leaderActor);
744
745         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
746         expectFirstMatching(leaderActor, AddServer.class);
747
748         LOG.info("testAddServerForwardedToLeader ending");
749     }
750
751     @Test
752     public void testOnApplyState() {
753         LOG.info("testOnApplyState starting");
754
755         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
756         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
757         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
758                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
759                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
760                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
761                 actorFactory.generateActorId(LEADER_ID));
762
763         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
764
765         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
766                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
767         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
768         assertEquals("Message handled", true, handled);
769
770         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
771                 new MockRaftActorContext.MockPayload("1"));
772         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
773         assertEquals("Message handled", false, handled);
774
775         LOG.info("testOnApplyState ending");
776     }
777
778     @Test
779     public void testRemoveServerWithNoLeader() {
780         LOG.info("testRemoveServerWithNoLeader starting");
781
782         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
783         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
784
785         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
786                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
787                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
788                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
789                 actorFactory.generateActorId(LEADER_ID));
790         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
791
792         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
793         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
794         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
795
796         LOG.info("testRemoveServerWithNoLeader ending");
797     }
798
799     @Test
800     public void testRemoveServerNonExistentServer() {
801         LOG.info("testRemoveServerNonExistentServer starting");
802
803         RaftActorContext initialActorContext = new MockRaftActorContext();
804
805         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
806                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
807                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
808                 actorFactory.generateActorId(LEADER_ID));
809
810         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
811         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
812         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
813
814         LOG.info("testRemoveServerNonExistentServer ending");
815     }
816
817     @Test
818     public void testRemoveServerForwardToLeader() {
819         LOG.info("testRemoveServerForwardToLeader starting");
820
821         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
822         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
823
824         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
825                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
826                 actorFactory.generateActorId(LEADER_ID));
827
828         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
829                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
830                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
831                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
832                 actorFactory.generateActorId(FOLLOWER_ID));
833         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
834
835         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
836                 -1, -1, (short)0), leaderActor);
837
838         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
839         expectFirstMatching(leaderActor, RemoveServer.class);
840
841         LOG.info("testRemoveServerForwardToLeader ending");
842     }
843
844     @Test
845     public void testRemoveServer() {
846         LOG.info("testRemoveServer starting");
847
848         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
849         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
850         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
851
852         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
853         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
854         RaftActorContext initialActorContext = new MockRaftActorContext();
855
856         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
857                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
858                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
859                 actorFactory.generateActorId(LEADER_ID));
860
861         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
862
863         TestActorRef<MessageCollectorActor> collector =
864                 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
865                         actorFactory.generateActorId("collector"));
866         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
867                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
868                         configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
869                 followerActorId);
870
871         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
872         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
873         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
874
875         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
876         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
877         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
878
879         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
880         assertTrue("Expected Leader", currentBehavior instanceof Leader);
881         assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
882
883         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
884
885         LOG.info("testRemoveServer ending");
886     }
887
888     @Test
889     public void testRemoveServerLeader() {
890         LOG.info("testRemoveServerLeader starting");
891
892         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
893         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
894         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
895
896         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
897         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
898         RaftActorContext initialActorContext = new MockRaftActorContext();
899
900         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
901                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
902                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
903                 actorFactory.generateActorId(LEADER_ID));
904
905         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
906
907         TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
908                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
909         actorFactory.createTestActor(
910                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
911                         configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
912                 followerActorId);
913
914         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
915         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
916         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
917
918         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
919         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
920         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
921                 votingServer(FOLLOWER_ID));
922
923         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
924
925         LOG.info("testRemoveServerLeader ending");
926     }
927
928     @Test
929     public void testRemoveServerLeaderWithNoFollowers() {
930         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
931
932         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
933                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
934                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
935                 actorFactory.generateActorId(LEADER_ID));
936
937         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
938         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
939         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
940
941         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
942     }
943
944     @Test
945     public void testChangeServersVotingStatus() {
946         LOG.info("testChangeServersVotingStatus starting");
947
948         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
949         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
950         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
951
952         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
953         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
954         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
955         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
956
957         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
958                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
959                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
960                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
961         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
962
963         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
964                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
965                 actorFactory.generateActorId("collector"));
966         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
967                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
968                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
969                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
970
971         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
972                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
973                 actorFactory.generateActorId("collector"));
974         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
975                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
976                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
977                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
978
979         // Send first ChangeServersVotingStatus message
980
981         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
982                 testKit.getRef());
983         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
984         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
985
986         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
987         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
988         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
989                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
990
991         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
992         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
993                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
994
995         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
996         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
997                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
998
999         MessageCollectorActor.clearMessages(leaderCollector);
1000         MessageCollectorActor.clearMessages(follower1Collector);
1001         MessageCollectorActor.clearMessages(follower2Collector);
1002
1003         // Send second ChangeServersVotingStatus message
1004
1005         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1006         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1007         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1008
1009         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1010         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1011                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1012
1013         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1014         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1015                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1016
1017         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1018         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1019                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1020
1021         LOG.info("testChangeServersVotingStatus ending");
1022     }
1023
1024     @Test
1025     public void testChangeLeaderToNonVoting() {
1026         LOG.info("testChangeLeaderToNonVoting starting");
1027
1028         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1029         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1030
1031         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1032         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1033         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1034         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1035
1036         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1037                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1038                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1039                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1040         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1041
1042         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1043                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1044                 actorFactory.generateActorId("collector"));
1045         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1046                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1047                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1048                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1049
1050         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1051                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1052                 actorFactory.generateActorId("collector"));
1053         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1054                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1055                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1056                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1057
1058         // Send ChangeServersVotingStatus message
1059
1060         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1061         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1062         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1063
1064         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1065         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1066                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1067
1068         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1069         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1070                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1071
1072         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1073         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1074                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1075
1076         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1077         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1078
1079         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1080
1081         LOG.info("testChangeLeaderToNonVoting ending");
1082     }
1083
1084     @Test
1085     public void testChangeLeaderToNonVotingInSingleNode() {
1086         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1087
1088         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1089                 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
1090                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1091
1092         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1093         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1094         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1095
1096         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1097     }
1098
1099     @Test
1100     public void testChangeToVotingWithNoLeader() {
1101         LOG.info("testChangeToVotingWithNoLeader starting");
1102
1103         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1104         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1105         configParams.setElectionTimeoutFactor(5);
1106
1107         final String node1ID = "node1";
1108         final String node2ID = "node2";
1109
1110         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1111         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1112         // actors created).
1113
1114         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1115                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1116                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1117         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1118
1119         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1120         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1121         InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1122         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1123         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1124         InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1125
1126         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1127                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1128                 actorFactory.generateActorId("collector"));
1129         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1130                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1131                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1132         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1133
1134         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1135                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1136                 actorFactory.generateActorId("collector"));
1137         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1138                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1139                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1140         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1141
1142         node1RaftActor.waitForInitializeBehaviorComplete();
1143         node2RaftActor.waitForInitializeBehaviorComplete();
1144
1145         // Verify the intended server config was loaded and applied.
1146         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1147                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1148                 votingServer("downNode2"));
1149         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1150         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1151         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1152
1153         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1154                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1155                 votingServer("downNode2"));
1156         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1157
1158         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1159         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1160         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1161         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1162
1163         // First send the message such that node1 has no peer address for node2 - should fail.
1164
1165         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1166                 node2ID, true, "downNode1", false, "downNode2", false));
1167         node1RaftActorRef.tell(changeServers, testKit.getRef());
1168         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1169         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1170
1171         // Send an AppendEntries so node1 has a leaderId
1172
1173         MessageCollectorActor.clearMessages(node1Collector);
1174
1175         long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1176         node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1177                 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1178
1179         // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
1180         // ChangeServersVotingStatus message, it will try to elect a leader.
1181
1182         MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
1183
1184         // Update node2's peer address and send the message again
1185
1186         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1187
1188         node1RaftActorRef.tell(changeServers, testKit.getRef());
1189         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1190         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1191
1192         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1193         assertEquals("getToIndex", 1, apply.getToIndex());
1194         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1195                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1196                 nonVotingServer("downNode2"));
1197         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1198         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1199
1200         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1201         assertEquals("getToIndex", 1, apply.getToIndex());
1202         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1203                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1204                 nonVotingServer("downNode2"));
1205         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1206         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1207
1208         LOG.info("testChangeToVotingWithNoLeader ending");
1209     }
1210
1211     @Test
1212     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1213         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1214
1215         final String node1ID = "node1";
1216         final String node2ID = "node2";
1217
1218         PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1219             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1220
1221         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1222                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1223         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1224
1225         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1226         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1227         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1228         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1229
1230         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1231         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1232         configParams1.setElectionTimeoutFactor(1);
1233         configParams1.setPeerAddressResolver(peerAddressResolver);
1234         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1235                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1236                 actorFactory.generateActorId("collector"));
1237         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1238                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1239                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1240         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1241
1242         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1243         configParams2.setElectionTimeoutFactor(1000000);
1244         configParams2.setPeerAddressResolver(peerAddressResolver);
1245         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1246                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1247                 actorFactory.generateActorId("collector"));
1248         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1249                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1250                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1251         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1252
1253         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1254         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1255         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1256         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1257         // node2 was previously voting.
1258
1259         node2RaftActor.setDropMessageOfType(RequestVote.class);
1260
1261         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1262         node1RaftActorRef.tell(changeServers, testKit.getRef());
1263         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1264         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1265
1266         assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1267                 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1268         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1269
1270         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1271     }
1272
1273     @Test
1274     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1275         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1276
1277         final String node1ID = "node1";
1278         final String node2ID = "node2";
1279
1280         PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1281             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1282
1283         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1284         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1285         configParams.setElectionTimeoutFactor(3);
1286         configParams.setPeerAddressResolver(peerAddressResolver);
1287
1288         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1289                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1290         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1291
1292         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1293         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1294         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1295         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1296         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1297                 new MockRaftActorContext.MockPayload("2")));
1298         InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1299
1300         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1301                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1302                 actorFactory.generateActorId("collector"));
1303         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1304                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1305                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1306         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1307
1308         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1309                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1310                 actorFactory.generateActorId("collector"));
1311         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1312                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1313                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1314         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1315
1316         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1317         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1318         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1319         // forward the request to node2.
1320
1321         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1322                 ImmutableMap.of(node1ID, true, node2ID, true));
1323         node1RaftActorRef.tell(changeServers, testKit.getRef());
1324         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1325         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1326
1327         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1328         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1329                 votingServer(node1ID), votingServer(node2ID));
1330         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1331
1332         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1333         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1334                 votingServer(node1ID), votingServer(node2ID));
1335         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1336         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1337
1338         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1339     }
1340
1341     @Test
1342     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1343         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1344
1345         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1346         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1347         configParams.setElectionTimeoutFactor(100000);
1348
1349         final String node1ID = "node1";
1350         final String node2ID = "node2";
1351
1352         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1353             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
1354
1355         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1356                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1357         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1358
1359         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1360         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1361         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1362         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1363
1364         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1365                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1366                 actorFactory.generateActorId("collector"));
1367         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1368                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1369                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1370         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1371
1372         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1373                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1374                 actorFactory.generateActorId("collector"));
1375         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1376                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1377                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1378         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1379
1380         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1381         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1382         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1383         // request to node2 when node2 is elected.
1384
1385         node2RaftActor.setDropMessageOfType(RequestVote.class);
1386
1387         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1388                 node2ID, true));
1389         node1RaftActorRef.tell(changeServers, testKit.getRef());
1390
1391         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1392
1393         node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1394
1395         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1396         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1397
1398         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1399         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1400                 votingServer(node1ID), votingServer(node2ID));
1401         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1402         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1403
1404         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1405         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1406                 votingServer(node1ID), votingServer(node2ID));
1407         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1408
1409         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1410     }
1411
1412     private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1413         Stopwatch sw = Stopwatch.createStarted();
1414         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1415             for(RaftActor raftActor: raftActors) {
1416                 if(raftActor.getRaftState() == expState) {
1417                     return;
1418                 }
1419             }
1420         }
1421
1422         fail("None of the RaftActors have state " + expState);
1423     }
1424
1425     private static ServerInfo votingServer(String id) {
1426         return new ServerInfo(id, true);
1427     }
1428
1429     private static ServerInfo nonVotingServer(String id) {
1430         return new ServerInfo(id, false);
1431     }
1432
1433     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1434         return newCollectorActor(leaderRaftActor, LEADER_ID);
1435     }
1436
1437     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1438         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1439                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1440                 actorFactory.generateActorId(id + "Collector"));
1441         raftActor.setCollectorActor(collectorActor);
1442         return collectorActor;
1443     }
1444
1445     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1446         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1447         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1448         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1449         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1450     }
1451
1452     private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1453         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1454         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1455         configParams.setElectionTimeoutFactor(100000);
1456         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1457         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1458         termInfo.update(1, LEADER_ID);
1459         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1460                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1461     }
1462
1463     static abstract class AbstractMockRaftActor extends MockRaftActor {
1464         private volatile TestActorRef<MessageCollectorActor> collectorActor;
1465         private volatile Class<?> dropMessageOfType;
1466
1467         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1468                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1469             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1470                     persistent(Optional.of(persistent)));
1471             this.collectorActor = collectorActor;
1472         }
1473
1474         void setDropMessageOfType(Class<?> dropMessageOfType) {
1475             this.dropMessageOfType = dropMessageOfType;
1476         }
1477
1478         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1479             this.collectorActor = collectorActor;
1480         }
1481
1482         @Override
1483         public void handleCommand(Object message) {
1484             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1485                 super.handleCommand(message);
1486             }
1487
1488             if(collectorActor != null) {
1489                 collectorActor.tell(message, getSender());
1490             }
1491         }
1492     }
1493
1494     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1495
1496         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1497                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1498             super(id, peerAddresses, config, persistent, collectorActor);
1499             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1500                 @Override
1501                 public void createSnapshot(ActorRef actorRef) {
1502                     actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1503                 }
1504
1505                 @Override
1506                 public void applySnapshot(byte[] snapshotBytes) {
1507                 }
1508             };
1509         }
1510
1511         public static Props props(final String id, final Map<String, String> peerAddresses,
1512                 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1513
1514             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1515                     persistent, collectorActor);
1516         }
1517
1518     }
1519
1520     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1521         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1522                 RaftActorContext fromContext) {
1523             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1524             setPersistence(false);
1525
1526             RaftActorContext context = getRaftActorContext();
1527             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1528                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1529                 getState().add(entry.getData());
1530                 context.getReplicatedLog().append(entry);
1531             }
1532
1533             context.setCommitIndex(fromContext.getCommitIndex());
1534             context.setLastApplied(fromContext.getLastApplied());
1535             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1536                     fromContext.getTermInformation().getVotedFor());
1537         }
1538
1539         @Override
1540         protected void initializeBehavior() {
1541             changeCurrentBehavior(new Leader(getRaftActorContext()));
1542             initializeBehaviorComplete.countDown();
1543         }
1544
1545         @Override
1546         public void createSnapshot(ActorRef actorRef) {
1547             try {
1548                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1549             } catch (Exception e) {
1550                 LOG.error("createSnapshot failed", e);
1551             }
1552         }
1553
1554         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1555             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1556             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1557             configParams.setElectionTimeoutFactor(10);
1558             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1559         }
1560     }
1561
1562     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1563         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1564             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1565             setPersistence(false);
1566         }
1567
1568         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1569             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1570         }
1571     }
1572 }