630ec7fb930302b5d0a98e1b20517207e246ec93
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
40 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
41 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
42 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
44 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
45 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
46 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
47 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
48 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
49 import org.opendaylight.controller.cluster.raft.messages.AddServer;
50 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
51 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
52 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
53 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
55 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
56 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
57 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
58 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
59 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
60 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
62 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
63 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
66 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
67 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.concurrent.duration.FiniteDuration;
71
72 /**
73  * Unit tests for RaftActorServerConfigurationSupport.
74  *
75  * @author Thomas Pantelis
76  */
77 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
78     static final String LEADER_ID = "leader";
79     static final String FOLLOWER_ID = "follower";
80     static final String FOLLOWER_ID2 = "follower2";
81     static final String NEW_SERVER_ID = "new-server";
82     static final String NEW_SERVER_ID2 = "new-server2";
83     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
84     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
85     private static final boolean NO_PERSISTENCE = false;
86     private static final boolean PERSISTENT = true;
87
88     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
89
90     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
92             actorFactory.generateActorId(FOLLOWER_ID));
93
94     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
95     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
96     private RaftActorContext newFollowerActorContext;
97
98     private final JavaTestKit testKit = new JavaTestKit(getSystem());
99
100     @Before
101     public void setup() {
102         InMemoryJournal.clear();
103         InMemorySnapshotStore.clear();
104     }
105
106     private void setupNewFollower() {
107         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
108
109         newFollowerCollectorActor = actorFactory.createTestActor(
110                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
111                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
112         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
113                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
114                 actorFactory.generateActorId(NEW_SERVER_ID));
115
116         try {
117             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
118         } catch (Exception e) {
119             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
120         }
121     }
122
123     private static DefaultConfigParamsImpl newFollowerConfigParams() {
124         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
125         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
126         configParams.setElectionTimeoutFactor(100000);
127         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
128         return configParams;
129     }
130
131     @After
132     public void tearDown() throws Exception {
133         actorFactory.close();
134     }
135
136     @Test
137     public void testAddServerWithExistingFollower() throws Exception {
138         LOG.info("testAddServerWithExistingFollower starting");
139         setupNewFollower();
140         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
141         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
142                 0, 3, 1).build());
143         followerActorContext.setCommitIndex(2);
144         followerActorContext.setLastApplied(2);
145
146         Follower follower = new Follower(followerActorContext);
147         followerActor.underlyingActor().setBehavior(follower);
148         followerActorContext.setCurrentBehavior(follower);
149
150         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
151                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
152                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
153                 actorFactory.generateActorId(LEADER_ID));
154
155         // Expect initial heartbeat from the leader.
156         expectFirstMatching(followerActor, AppendEntries.class);
157         clearMessages(followerActor);
158
159         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
160         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
161
162         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
163
164         // Leader should install snapshot - capture and verify ApplySnapshot contents
165
166         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
167         @SuppressWarnings("unchecked")
168         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
169         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
170
171         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
172         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
173         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
174
175         // Verify ServerConfigurationPayload entry in leader's log
176
177         expectFirstMatching(leaderCollectorActor, ApplyState.class);
178         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
179         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
180         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
181         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
182         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
183                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
184
185         // Verify ServerConfigurationPayload entry in both followers
186
187         expectFirstMatching(followerActor, ApplyState.class);
188         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
189         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
190                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
191
192         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
193         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
194         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
195                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
196
197         // Verify new server config was applied in both followers
198
199         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
200
201         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
202
203         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
204         expectFirstMatching(followerActor, ApplyState.class);
205
206         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
207         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
208         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
209         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
210
211         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
212         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
213         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
214         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
215         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
216         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
217
218         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
219         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
220         logEntry = persistedLogEntries.get(0);
221         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
222         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
223         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
224                 logEntry.getData().getClass());
225
226         LOG.info("testAddServerWithExistingFollower ending");
227     }
228
229     @Test
230     public void testAddServerWithNoExistingFollower() throws Exception {
231         LOG.info("testAddServerWithNoExistingFollower starting");
232
233         setupNewFollower();
234         RaftActorContext initialActorContext = new MockRaftActorContext();
235         initialActorContext.setCommitIndex(1);
236         initialActorContext.setLastApplied(1);
237         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
238                 0, 2, 1).build());
239
240         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
241                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
242                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
243                 actorFactory.generateActorId(LEADER_ID));
244
245         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
246         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
247
248         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
249
250         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
251
252         // Leader should install snapshot - capture and verify ApplySnapshot contents
253
254         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
255         @SuppressWarnings("unchecked")
256         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
257         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
258
259         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
260         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
261         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
262
263         // Verify ServerConfigurationPayload entry in leader's log
264
265         expectFirstMatching(leaderCollectorActor, ApplyState.class);
266         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
267         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
268         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
269         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
270                 votingServer(NEW_SERVER_ID));
271
272         // Verify ServerConfigurationPayload entry in the new follower
273
274         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
275         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
276         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
277                 votingServer(NEW_SERVER_ID));
278
279         // Verify new server config was applied in the new follower
280
281         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
282
283         LOG.info("testAddServerWithNoExistingFollower ending");
284     }
285
286     @Test
287     public void testAddServersAsNonVoting() throws Exception {
288         LOG.info("testAddServersAsNonVoting starting");
289
290         setupNewFollower();
291         RaftActorContext initialActorContext = new MockRaftActorContext();
292
293         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
294                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
295                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
296                 actorFactory.generateActorId(LEADER_ID));
297
298         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
299         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
300
301         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
302
303         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
304
305         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
306         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
307         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
308
309         // Verify ServerConfigurationPayload entry in leader's log
310
311         expectFirstMatching(leaderCollectorActor, ApplyState.class);
312
313         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
314         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
315         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
316         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
317                 nonVotingServer(NEW_SERVER_ID));
318
319         // Verify ServerConfigurationPayload entry in the new follower
320
321         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
322         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
323         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
324                 nonVotingServer(NEW_SERVER_ID));
325
326         // Verify new server config was applied in the new follower
327
328         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
329
330         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
331
332         // Add another non-voting server.
333
334         clearMessages(leaderCollectorActor);
335
336         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
337         Follower newFollower2 = new Follower(follower2ActorContext);
338         followerActor.underlyingActor().setBehavior(newFollower2);
339
340         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
341
342         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
343         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
344         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
345
346         expectFirstMatching(leaderCollectorActor, ApplyState.class);
347         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
348         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
349         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
350         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
351                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
352
353         LOG.info("testAddServersAsNonVoting ending");
354     }
355
356     @Test
357     public void testAddServerWithOperationInProgress() throws Exception {
358         LOG.info("testAddServerWithOperationInProgress starting");
359
360         setupNewFollower();
361         RaftActorContext initialActorContext = new MockRaftActorContext();
362
363         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
364                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
365                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
366                 actorFactory.generateActorId(LEADER_ID));
367
368         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
369         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
370
371         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
372
373         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
374         Follower newFollower2 = new Follower(follower2ActorContext);
375         followerActor.underlyingActor().setBehavior(newFollower2);
376
377         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
378         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
379
380         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
381
382         // Wait for leader's install snapshot and capture it
383
384         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
385
386         // Send a second AddServer - should get queued
387         JavaTestKit testKit2 = new JavaTestKit(getSystem());
388         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
389
390         // Continue the first AddServer
391         newFollowerRaftActorInstance.setDropMessageOfType(null);
392         newFollowerRaftActor.tell(installSnapshot, leaderActor);
393
394         // Verify both complete successfully
395         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
396         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
397
398         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
399         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
400
401         // Verify ServerConfigurationPayload entries in leader's log
402
403         expectMatching(leaderCollectorActor, ApplyState.class, 2);
404         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
405         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
406         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
407         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
408                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
409
410         // Verify ServerConfigurationPayload entry in the new follower
411
412         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
413         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
414                newFollowerActorContext.getPeerIds());
415
416         LOG.info("testAddServerWithOperationInProgress ending");
417     }
418
419     @Test
420     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
421         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
422
423         setupNewFollower();
424         RaftActorContext initialActorContext = new MockRaftActorContext();
425
426         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
427                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
428                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
429                 actorFactory.generateActorId(LEADER_ID));
430
431         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
432         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
433
434         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
435
436         // Drop commit message for now to delay snapshot completion
437         leaderRaftActor.setDropMessageOfType(String.class);
438
439         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
440
441         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
442
443         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
444
445         leaderRaftActor.setDropMessageOfType(null);
446         leaderActor.tell(commitMsg, leaderActor);
447
448         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
449         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
450         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
451
452         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
453
454         // Verify ServerConfigurationPayload entry in leader's log
455
456         expectFirstMatching(leaderCollectorActor, ApplyState.class);
457         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
458         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
459         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
460         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
461                 votingServer(NEW_SERVER_ID));
462
463         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
464     }
465
466     @Test
467     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
468         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
469
470         setupNewFollower();
471         RaftActorContext initialActorContext = new MockRaftActorContext();
472
473         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
474                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
475                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
476                 actorFactory.generateActorId(LEADER_ID));
477
478         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
479         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
480
481         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
482
483         // Drop commit message so the snapshot doesn't complete.
484         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
485
486         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
487
488         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
489
490         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
491         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
492
493         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
494
495         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
496     }
497
498     @Test
499     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
500         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
501
502         setupNewFollower();
503         RaftActorContext initialActorContext = new MockRaftActorContext();
504
505         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
506                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
507                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
508                 actorFactory.generateActorId(LEADER_ID));
509
510         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
511         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
512         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
513
514         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
515
516         // Drop the commit message so the snapshot doesn't complete yet.
517         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
518
519         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
520
521         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
522
523         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
524
525         // Change the leader behavior to follower
526         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
527
528         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
529         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
530         // isCapturing assertion below.
531         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
532
533         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
534         leaderActor.tell(commitMsg, leaderActor);
535
536         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
537
538         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
539         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
540
541         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
542         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
543
544         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
545     }
546
547     @Test
548     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
549         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
550
551         setupNewFollower();
552         RaftActorContext initialActorContext = new MockRaftActorContext();
553
554         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
555                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
556                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
557                 actorFactory.generateActorId(LEADER_ID));
558
559         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
560         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
561
562         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
563
564         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
565
566         // Drop the UnInitializedFollowerSnapshotReply to delay it.
567         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
568
569         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
570
571         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
572                 UnInitializedFollowerSnapshotReply.class);
573
574         // Prevent election timeout when the leader switches to follower
575         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
576
577         // Change the leader behavior to follower
578         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
579
580         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
581         leaderRaftActor.setDropMessageOfType(null);
582         leaderActor.tell(snapshotReply, leaderActor);
583
584         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
585         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
586
587         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
588
589         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
590     }
591
592     @Test
593     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
594         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
595
596         setupNewFollower();
597         RaftActorContext initialActorContext = new MockRaftActorContext();
598
599         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
600                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
601                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
602                 actorFactory.generateActorId(LEADER_ID));
603
604         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
605         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
606         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
607
608         // Drop the InstallSnapshot message so it times out
609         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
610
611         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
612
613         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
614
615         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
616         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
617
618         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
619         assertEquals("Leader followers size", 0,
620                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
621
622         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
623     }
624
625     @Test
626     public void testAddServerWithNoLeader() {
627         LOG.info("testAddServerWithNoLeader starting");
628
629         setupNewFollower();
630         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
631         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
632
633         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
634                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
635                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
636                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
637                 actorFactory.generateActorId(LEADER_ID));
638         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
639
640         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
641         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
642         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
643
644         LOG.info("testAddServerWithNoLeader ending");
645     }
646
647     @Test
648     public void testAddServerWithNoConsensusReached() {
649         LOG.info("testAddServerWithNoConsensusReached starting");
650
651         setupNewFollower();
652         RaftActorContext initialActorContext = new MockRaftActorContext();
653
654         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
655                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
656                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
657                 actorFactory.generateActorId(LEADER_ID));
658
659         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
660         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
661
662         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
663
664         // Drop UnInitializedFollowerSnapshotReply initially
665         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
666
667         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
668         TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
669                 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
670
671         // Drop AppendEntries to the new follower so consensus isn't reached
672         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
673
674         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
675
676         // Capture the UnInitializedFollowerSnapshotReply
677         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
678
679         // Send the UnInitializedFollowerSnapshotReply to resume the first request
680         leaderRaftActor.setDropMessageOfType(null);
681         leaderActor.tell(snapshotReply, leaderActor);
682
683         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
684
685         // Send a second AddServer
686         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
687
688         // The first AddServer should succeed with OK even though consensus wasn't reached
689         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
690         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
691         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
692
693         // Verify ServerConfigurationPayload entry in leader's log
694         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
695                 votingServer(NEW_SERVER_ID));
696
697         // The second AddServer should fail since consensus wasn't reached for the first
698         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
699         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
700
701         // Re-send the second AddServer - should also fail
702         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
703         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
704         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
705
706         LOG.info("testAddServerWithNoConsensusReached ending");
707     }
708
709     @Test
710     public void testAddServerWithExistingServer() {
711         LOG.info("testAddServerWithExistingServer starting");
712
713         RaftActorContext initialActorContext = new MockRaftActorContext();
714
715         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
716                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
717                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
718                 actorFactory.generateActorId(LEADER_ID));
719
720         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
721
722         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
723         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
724
725         LOG.info("testAddServerWithExistingServer ending");
726     }
727
728     @Test
729     public void testAddServerForwardedToLeader() {
730         LOG.info("testAddServerForwardedToLeader starting");
731
732         setupNewFollower();
733         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
734         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
735
736         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
737                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
738                 actorFactory.generateActorId(LEADER_ID));
739
740         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
741                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
742                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
743                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
744                 actorFactory.generateActorId(FOLLOWER_ID));
745         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
746
747         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
748                 -1, -1, (short)0), leaderActor);
749
750         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
751         expectFirstMatching(leaderActor, AddServer.class);
752
753         LOG.info("testAddServerForwardedToLeader ending");
754     }
755
756     @Test
757     public void testOnApplyState() {
758         LOG.info("testOnApplyState starting");
759
760         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
761         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
762         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
763                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
764                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
765                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
766                 actorFactory.generateActorId(LEADER_ID));
767
768         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
769
770         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
771                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
772         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
773         assertEquals("Message handled", true, handled);
774
775         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
776                 new MockRaftActorContext.MockPayload("1"));
777         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
778         assertEquals("Message handled", false, handled);
779
780         LOG.info("testOnApplyState ending");
781     }
782
783     @Test
784     public void testRemoveServerWithNoLeader() {
785         LOG.info("testRemoveServerWithNoLeader starting");
786
787         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
788         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
789
790         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
791                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
792                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
793                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
794                 actorFactory.generateActorId(LEADER_ID));
795         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
796
797         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
798         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
799         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
800
801         LOG.info("testRemoveServerWithNoLeader ending");
802     }
803
804     @Test
805     public void testRemoveServerNonExistentServer() {
806         LOG.info("testRemoveServerNonExistentServer starting");
807
808         RaftActorContext initialActorContext = new MockRaftActorContext();
809
810         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
811                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
812                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
813                 actorFactory.generateActorId(LEADER_ID));
814
815         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
816         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
817         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
818
819         LOG.info("testRemoveServerNonExistentServer ending");
820     }
821
822     @Test
823     public void testRemoveServerForwardToLeader() {
824         LOG.info("testRemoveServerForwardToLeader starting");
825
826         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
827         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
828
829         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
830                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
831                 actorFactory.generateActorId(LEADER_ID));
832
833         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
834                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
835                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
836                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
837                 actorFactory.generateActorId(FOLLOWER_ID));
838         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
839
840         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
841                 -1, -1, (short)0), leaderActor);
842
843         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
844         expectFirstMatching(leaderActor, RemoveServer.class);
845
846         LOG.info("testRemoveServerForwardToLeader ending");
847     }
848
849     @Test
850     public void testRemoveServer() {
851         LOG.info("testRemoveServer starting");
852
853         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
854         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
855         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
856
857         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
858         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
859         RaftActorContext initialActorContext = new MockRaftActorContext();
860
861         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
862                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
863                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
864                 actorFactory.generateActorId(LEADER_ID));
865
866         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
867
868         TestActorRef<MessageCollectorActor> collector =
869                 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
870                         actorFactory.generateActorId("collector"));
871         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
872                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
873                         configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
874                 followerActorId);
875
876         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
877         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
878         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
879
880         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
881         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
882         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
883
884         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
885         assertTrue("Expected Leader", currentBehavior instanceof Leader);
886         assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
887
888         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
889
890         LOG.info("testRemoveServer ending");
891     }
892
893     @Test
894     public void testRemoveServerLeader() {
895         LOG.info("testRemoveServerLeader starting");
896
897         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
898         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
899         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
900
901         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
902         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
903         RaftActorContext initialActorContext = new MockRaftActorContext();
904
905         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
906                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
907                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
908                 actorFactory.generateActorId(LEADER_ID));
909
910         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
911
912         TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
913                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
914         actorFactory.createTestActor(
915                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
916                         configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
917                 followerActorId);
918
919         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
920         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
921         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
922
923         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
924         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
925         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
926                 votingServer(FOLLOWER_ID));
927
928         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
929
930         LOG.info("testRemoveServerLeader ending");
931     }
932
933     @Test
934     public void testRemoveServerLeaderWithNoFollowers() {
935         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
936
937         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
938                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
939                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
940                 actorFactory.generateActorId(LEADER_ID));
941
942         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
943         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
944         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
945
946         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
947     }
948
949     @Test
950     public void testChangeServersVotingStatus() {
951         LOG.info("testChangeServersVotingStatus starting");
952
953         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
954         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
955         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
956
957         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
958         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
959         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
960         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
961
962         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
963                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
964                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
965                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
966         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
967
968         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
969                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
970                 actorFactory.generateActorId("collector"));
971         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
972                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
973                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
974                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
975
976         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
977                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
978                 actorFactory.generateActorId("collector"));
979         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
980                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
981                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
982                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
983
984         // Send first ChangeServersVotingStatus message
985
986         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
987                 testKit.getRef());
988         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
989         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
990
991         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
992         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
993         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
994                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
995
996         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
997         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
998                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
999
1000         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1001         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1002                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1003
1004         MessageCollectorActor.clearMessages(leaderCollector);
1005         MessageCollectorActor.clearMessages(follower1Collector);
1006         MessageCollectorActor.clearMessages(follower2Collector);
1007
1008         // Send second ChangeServersVotingStatus message
1009
1010         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1011         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1012         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1013
1014         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1015         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1016                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1017
1018         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1019         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1020                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1021
1022         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1023         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1024                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1025
1026         LOG.info("testChangeServersVotingStatus ending");
1027     }
1028
1029     @Test
1030     public void testChangeLeaderToNonVoting() {
1031         LOG.info("testChangeLeaderToNonVoting starting");
1032
1033         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1034         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1035
1036         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1037         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1038         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1039         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1040
1041         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1042                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1043                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1044                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1045         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1046
1047         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1048                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1049                 actorFactory.generateActorId("collector"));
1050         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1051                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1052                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1053                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1054
1055         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1056                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1057                 actorFactory.generateActorId("collector"));
1058         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1059                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1060                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1061                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1062
1063         // Send ChangeServersVotingStatus message
1064
1065         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1066         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1067         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1068
1069         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1070         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1071                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1072
1073         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1074         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1075                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1076
1077         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1078         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1079                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1080
1081         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1082         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1083
1084         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1085
1086         LOG.info("testChangeLeaderToNonVoting ending");
1087     }
1088
1089     @Test
1090     public void testChangeLeaderToNonVotingInSingleNode() {
1091         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1092
1093         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1094                 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
1095                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1096
1097         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1098         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1099         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1100
1101         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1102     }
1103
1104     @Test
1105     public void testChangeToVotingWithNoLeader() {
1106         LOG.info("testChangeToVotingWithNoLeader starting");
1107
1108         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1109         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1110         configParams.setElectionTimeoutFactor(5);
1111
1112         final String node1ID = "node1";
1113         final String node2ID = "node2";
1114
1115         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1116         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1117         // actors created).
1118
1119         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1120                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1121                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1122         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1123
1124         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1125         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1126         InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1127         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1128         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1129         InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1130
1131         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1132                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1133                 actorFactory.generateActorId("collector"));
1134         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1135                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1136                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1137         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1138
1139         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1140                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1141                 actorFactory.generateActorId("collector"));
1142         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1143                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1144                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1145         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1146
1147         node1RaftActor.waitForInitializeBehaviorComplete();
1148         node2RaftActor.waitForInitializeBehaviorComplete();
1149
1150         // Verify the intended server config was loaded and applied.
1151         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1152                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1153                 votingServer("downNode2"));
1154         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1155         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1156         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1157
1158         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1159                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1160                 votingServer("downNode2"));
1161         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1162
1163         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1164         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1165         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1166         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1167
1168         // First send the message such that node1 has no peer address for node2 - should fail.
1169
1170         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1171                 node2ID, true, "downNode1", false, "downNode2", false));
1172         node1RaftActorRef.tell(changeServers, testKit.getRef());
1173         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1174         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1175
1176         // Send an AppendEntries so node1 has a leaderId
1177
1178         MessageCollectorActor.clearMessages(node1Collector);
1179
1180         long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1181         node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1182                 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1183
1184         // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
1185         // ChangeServersVotingStatus message, it will try to elect a leader.
1186
1187         MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
1188
1189         // Update node2's peer address and send the message again
1190
1191         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1192
1193         node1RaftActorRef.tell(changeServers, testKit.getRef());
1194         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1195         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1196
1197         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1198         assertEquals("getToIndex", 1, apply.getToIndex());
1199         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1200                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1201                 nonVotingServer("downNode2"));
1202         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1203         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1204
1205         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1206         assertEquals("getToIndex", 1, apply.getToIndex());
1207         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1208                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1209                 nonVotingServer("downNode2"));
1210         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1211         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1212
1213         LOG.info("testChangeToVotingWithNoLeader ending");
1214     }
1215
1216     @Test
1217     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1218         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1219
1220         final String node1ID = "node1";
1221         final String node2ID = "node2";
1222
1223         PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1224             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1225
1226         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1227                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1228         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1229
1230         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1231         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1232         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1233         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1234
1235         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1236         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1237         configParams1.setElectionTimeoutFactor(1);
1238         configParams1.setPeerAddressResolver(peerAddressResolver);
1239         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1240                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1241                 actorFactory.generateActorId("collector"));
1242         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1243                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1244                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1245         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1246
1247         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1248         configParams2.setElectionTimeoutFactor(1000000);
1249         configParams2.setPeerAddressResolver(peerAddressResolver);
1250         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1251                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1252                 actorFactory.generateActorId("collector"));
1253         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1254                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1255                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1256         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1257
1258         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1259         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1260         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1261         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1262         // node2 was previously voting.
1263
1264         node2RaftActor.setDropMessageOfType(RequestVote.class);
1265
1266         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1267         node1RaftActorRef.tell(changeServers, testKit.getRef());
1268         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1269         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1270
1271         assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1272                 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1273         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1274
1275         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1276     }
1277
1278     @Test
1279     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1280         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1281
1282         final String node1ID = "node1";
1283         final String node2ID = "node2";
1284
1285         PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1286             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1287
1288         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1289         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1290         configParams.setElectionTimeoutFactor(3);
1291         configParams.setPeerAddressResolver(peerAddressResolver);
1292
1293         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1294                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1295         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1296
1297         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1298         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1299         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1300         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1301         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1302                 new MockRaftActorContext.MockPayload("2")));
1303         InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1304
1305         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1306                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1307                 actorFactory.generateActorId("collector"));
1308         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1309                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1310                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1311         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1312
1313         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1314                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1315                 actorFactory.generateActorId("collector"));
1316         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1317                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1318                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1319         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1320
1321         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1322         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1323         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1324         // forward the request to node2.
1325
1326         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1327                 ImmutableMap.of(node1ID, true, node2ID, true));
1328         node1RaftActorRef.tell(changeServers, testKit.getRef());
1329         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1330         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1331
1332         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1333         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1334                 votingServer(node1ID), votingServer(node2ID));
1335         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1336
1337         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1338         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1339                 votingServer(node1ID), votingServer(node2ID));
1340         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1341         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1342
1343         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1344     }
1345
1346     @Test
1347     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1348         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1349
1350         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1351         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1352         configParams.setElectionTimeoutFactor(100000);
1353
1354         final String node1ID = "node1";
1355         final String node2ID = "node2";
1356
1357         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1358             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
1359
1360         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1361                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1362         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1363
1364         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1365         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1366         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1367         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1368
1369         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1370                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1371                 actorFactory.generateActorId("collector"));
1372         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1373                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1374                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1375         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1376
1377         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1378                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1379                 actorFactory.generateActorId("collector"));
1380         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1381                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1382                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1383         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1384
1385         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1386         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1387         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1388         // request to node2 when node2 is elected.
1389
1390         node2RaftActor.setDropMessageOfType(RequestVote.class);
1391
1392         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1393                 node2ID, true));
1394         node1RaftActorRef.tell(changeServers, testKit.getRef());
1395
1396         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1397
1398         node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1399
1400         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1401         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1402
1403         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1404         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1405                 votingServer(node1ID), votingServer(node2ID));
1406         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1407         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1408
1409         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1410         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1411                 votingServer(node1ID), votingServer(node2ID));
1412         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1413
1414         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1415     }
1416
1417     private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1418         Stopwatch sw = Stopwatch.createStarted();
1419         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1420             for(RaftActor raftActor: raftActors) {
1421                 if(raftActor.getRaftState() == expState) {
1422                     return;
1423                 }
1424             }
1425         }
1426
1427         fail("None of the RaftActors have state " + expState);
1428     }
1429
1430     private static ServerInfo votingServer(String id) {
1431         return new ServerInfo(id, true);
1432     }
1433
1434     private static ServerInfo nonVotingServer(String id) {
1435         return new ServerInfo(id, false);
1436     }
1437
1438     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1439         return newCollectorActor(leaderRaftActor, LEADER_ID);
1440     }
1441
1442     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1443         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1444                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1445                 actorFactory.generateActorId(id + "Collector"));
1446         raftActor.setCollectorActor(collectorActor);
1447         return collectorActor;
1448     }
1449
1450     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1451         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1452         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1453         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1454         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1455     }
1456
1457     private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1458         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1459         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1460         configParams.setElectionTimeoutFactor(100000);
1461         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1462         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1463         termInfo.update(1, LEADER_ID);
1464         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1465                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1466     }
1467
1468     static abstract class AbstractMockRaftActor extends MockRaftActor {
1469         private volatile TestActorRef<MessageCollectorActor> collectorActor;
1470         private volatile Class<?> dropMessageOfType;
1471
1472         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1473                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1474             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1475                     persistent(Optional.of(persistent)));
1476             this.collectorActor = collectorActor;
1477         }
1478
1479         void setDropMessageOfType(Class<?> dropMessageOfType) {
1480             this.dropMessageOfType = dropMessageOfType;
1481         }
1482
1483         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1484             this.collectorActor = collectorActor;
1485         }
1486
1487         @Override
1488         public void handleCommand(Object message) {
1489             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1490                 super.handleCommand(message);
1491             }
1492
1493             if(collectorActor != null) {
1494                 collectorActor.tell(message, getSender());
1495             }
1496         }
1497     }
1498
1499     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1500
1501         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1502                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1503             super(id, peerAddresses, config, persistent, collectorActor);
1504             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1505                 @Override
1506                 public void createSnapshot(ActorRef actorRef) {
1507                     actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1508                 }
1509
1510                 @Override
1511                 public void applySnapshot(byte[] snapshotBytes) {
1512                 }
1513             };
1514         }
1515
1516         public static Props props(final String id, final Map<String, String> peerAddresses,
1517                 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1518
1519             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1520                     persistent, collectorActor);
1521         }
1522
1523     }
1524
1525     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1526         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1527                 RaftActorContext fromContext) {
1528             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1529             setPersistence(false);
1530
1531             RaftActorContext context = getRaftActorContext();
1532             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1533                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1534                 getState().add(entry.getData());
1535                 context.getReplicatedLog().append(entry);
1536             }
1537
1538             context.setCommitIndex(fromContext.getCommitIndex());
1539             context.setLastApplied(fromContext.getLastApplied());
1540             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1541                     fromContext.getTermInformation().getVotedFor());
1542         }
1543
1544         @Override
1545         protected void initializeBehavior() {
1546             changeCurrentBehavior(new Leader(getRaftActorContext()));
1547             initializeBehaviorComplete.countDown();
1548         }
1549
1550         @Override
1551         public void createSnapshot(ActorRef actorRef) {
1552             try {
1553                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1554             } catch (Exception e) {
1555                 LOG.error("createSnapshot failed", e);
1556             }
1557         }
1558
1559         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1560             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1561             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1562             configParams.setElectionTimeoutFactor(10);
1563             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1564         }
1565     }
1566
1567     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1568         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1569             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1570             setPersistence(false);
1571         }
1572
1573         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1574             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1575         }
1576     }
1577 }