Implement cluster admin RPCs to change member voting states
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
43 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
45 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
46 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
47 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
48 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
49 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
50 import org.opendaylight.controller.cluster.raft.messages.AddServer;
51 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
52 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
53 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
54 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
56 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
57 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
58 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
59 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
60 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
61 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.duration.FiniteDuration;
70
71 /**
72  * Unit tests for RaftActorServerConfigurationSupport.
73  *
74  * @author Thomas Pantelis
75  */
76 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
77     static final String LEADER_ID = "leader";
78     static final String FOLLOWER_ID = "follower";
79     static final String FOLLOWER_ID2 = "follower2";
80     static final String NEW_SERVER_ID = "new-server";
81     static final String NEW_SERVER_ID2 = "new-server2";
82     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
83     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
84     private static final boolean NO_PERSISTENCE = false;
85     private static final boolean PERSISTENT = true;
86
87     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
88
89     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
90             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
91             actorFactory.generateActorId(FOLLOWER_ID));
92
93     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
94     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
95     private RaftActorContext newFollowerActorContext;
96
97     private final JavaTestKit testKit = new JavaTestKit(getSystem());
98
99     @Before
100     public void setup() {
101         InMemoryJournal.clear();
102         InMemorySnapshotStore.clear();
103     }
104
105     private void setupNewFollower() {
106         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
107
108         newFollowerCollectorActor = actorFactory.createTestActor(
109                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
110                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
111         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
112                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
113                 actorFactory.generateActorId(NEW_SERVER_ID));
114
115         try {
116             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
117         } catch (Exception e) {
118             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
119         }
120     }
121
122     private static DefaultConfigParamsImpl newFollowerConfigParams() {
123         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
124         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
125         configParams.setElectionTimeoutFactor(100000);
126         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
127         return configParams;
128     }
129
130     @After
131     public void tearDown() throws Exception {
132         actorFactory.close();
133     }
134
135     @Test
136     public void testAddServerWithExistingFollower() throws Exception {
137         LOG.info("testAddServerWithExistingFollower starting");
138         setupNewFollower();
139         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
140         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
141                 0, 3, 1).build());
142         followerActorContext.setCommitIndex(2);
143         followerActorContext.setLastApplied(2);
144
145         Follower follower = new Follower(followerActorContext);
146         followerActor.underlyingActor().setBehavior(follower);
147         followerActorContext.setCurrentBehavior(follower);
148
149         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
150                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
151                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
152                 actorFactory.generateActorId(LEADER_ID));
153
154         // Expect initial heartbeat from the leader.
155         expectFirstMatching(followerActor, AppendEntries.class);
156         clearMessages(followerActor);
157
158         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
159         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
160
161         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
162
163         // Leader should install snapshot - capture and verify ApplySnapshot contents
164
165         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
166         @SuppressWarnings("unchecked")
167         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
168         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
169
170         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
171         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
172         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
173
174         // Verify ServerConfigurationPayload entry in leader's log
175
176         expectFirstMatching(leaderCollectorActor, ApplyState.class);
177         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
178         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
179         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
180         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
181         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
182                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
183
184         // Verify ServerConfigurationPayload entry in both followers
185
186         expectFirstMatching(followerActor, ApplyState.class);
187         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
188         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
189                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
190
191         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
192         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
193         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
194                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
195
196         // Verify new server config was applied in both followers
197
198         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
199
200         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
201
202         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
203         expectFirstMatching(followerActor, ApplyState.class);
204
205         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
206         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
207         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
208         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
209
210         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
211         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
212         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
213         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
214         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
215         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
216
217         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
218         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
219         logEntry = persistedLogEntries.get(0);
220         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
221         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
222         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
223                 logEntry.getData().getClass());
224
225         LOG.info("testAddServerWithExistingFollower ending");
226     }
227
228     @Test
229     public void testAddServerWithNoExistingFollower() throws Exception {
230         LOG.info("testAddServerWithNoExistingFollower starting");
231
232         setupNewFollower();
233         RaftActorContext initialActorContext = new MockRaftActorContext();
234         initialActorContext.setCommitIndex(1);
235         initialActorContext.setLastApplied(1);
236         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
237                 0, 2, 1).build());
238
239         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
240                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
241                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
242                 actorFactory.generateActorId(LEADER_ID));
243
244         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
245         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
246
247         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
248
249         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
250
251         // Leader should install snapshot - capture and verify ApplySnapshot contents
252
253         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
254         @SuppressWarnings("unchecked")
255         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
256         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
257
258         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
259         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
260         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
261
262         // Verify ServerConfigurationPayload entry in leader's log
263
264         expectFirstMatching(leaderCollectorActor, ApplyState.class);
265         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
266         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
267         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
268         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
269                 votingServer(NEW_SERVER_ID));
270
271         // Verify ServerConfigurationPayload entry in the new follower
272
273         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
274         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
275         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
276                 votingServer(NEW_SERVER_ID));
277
278         // Verify new server config was applied in the new follower
279
280         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
281
282         LOG.info("testAddServerWithNoExistingFollower ending");
283     }
284
285     @Test
286     public void testAddServersAsNonVoting() throws Exception {
287         LOG.info("testAddServersAsNonVoting starting");
288
289         setupNewFollower();
290         RaftActorContext initialActorContext = new MockRaftActorContext();
291
292         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
293                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
294                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
295                 actorFactory.generateActorId(LEADER_ID));
296
297         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
298         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
299
300         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
301
302         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
303
304         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
305         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
306         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
307
308         // Verify ServerConfigurationPayload entry in leader's log
309
310         expectFirstMatching(leaderCollectorActor, ApplyState.class);
311
312         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
313         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
314         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
315         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
316                 nonVotingServer(NEW_SERVER_ID));
317
318         // Verify ServerConfigurationPayload entry in the new follower
319
320         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
321         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
322         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
323                 nonVotingServer(NEW_SERVER_ID));
324
325         // Verify new server config was applied in the new follower
326
327         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
328
329         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
330
331         // Add another non-voting server.
332
333         clearMessages(leaderCollectorActor);
334
335         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
336         Follower newFollower2 = new Follower(follower2ActorContext);
337         followerActor.underlyingActor().setBehavior(newFollower2);
338
339         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
340
341         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
342         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
343         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
344
345         expectFirstMatching(leaderCollectorActor, ApplyState.class);
346         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
347         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
348         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
349         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
350                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
351
352         LOG.info("testAddServersAsNonVoting ending");
353     }
354
355     @Test
356     public void testAddServerWithOperationInProgress() throws Exception {
357         LOG.info("testAddServerWithOperationInProgress starting");
358
359         setupNewFollower();
360         RaftActorContext initialActorContext = new MockRaftActorContext();
361
362         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
363                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
364                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
365                 actorFactory.generateActorId(LEADER_ID));
366
367         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
368         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
369
370         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
371
372         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
373         Follower newFollower2 = new Follower(follower2ActorContext);
374         followerActor.underlyingActor().setBehavior(newFollower2);
375
376         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
377         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
378
379         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
380
381         // Wait for leader's install snapshot and capture it
382
383         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
384
385         // Send a second AddServer - should get queued
386         JavaTestKit testKit2 = new JavaTestKit(getSystem());
387         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
388
389         // Continue the first AddServer
390         newFollowerRaftActorInstance.setDropMessageOfType(null);
391         newFollowerRaftActor.tell(installSnapshot, leaderActor);
392
393         // Verify both complete successfully
394         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
395         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
396
397         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
398         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
399
400         // Verify ServerConfigurationPayload entries in leader's log
401
402         expectMatching(leaderCollectorActor, ApplyState.class, 2);
403         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
404         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
405         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
406         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
407                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
408
409         // Verify ServerConfigurationPayload entry in the new follower
410
411         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
412         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
413                newFollowerActorContext.getPeerIds());
414
415         LOG.info("testAddServerWithOperationInProgress ending");
416     }
417
418     @Test
419     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
420         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
421
422         setupNewFollower();
423         RaftActorContext initialActorContext = new MockRaftActorContext();
424
425         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
426                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
427                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
428                 actorFactory.generateActorId(LEADER_ID));
429
430         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
431         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
432
433         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
434
435         // Drop commit message for now to delay snapshot completion
436         leaderRaftActor.setDropMessageOfType(String.class);
437
438         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
439
440         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
441
442         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
443
444         leaderRaftActor.setDropMessageOfType(null);
445         leaderActor.tell(commitMsg, leaderActor);
446
447         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
448         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
449         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
450
451         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
452
453         // Verify ServerConfigurationPayload entry in leader's log
454
455         expectFirstMatching(leaderCollectorActor, ApplyState.class);
456         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
457         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
458         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
459         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
460                 votingServer(NEW_SERVER_ID));
461
462         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
463     }
464
465     @Test
466     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
467         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
468
469         setupNewFollower();
470         RaftActorContext initialActorContext = new MockRaftActorContext();
471
472         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
473                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
474                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
475                 actorFactory.generateActorId(LEADER_ID));
476
477         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
478         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
479
480         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
481
482         // Drop commit message so the snapshot doesn't complete.
483         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
484
485         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
486
487         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
488
489         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
490         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
491
492         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
493
494         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
495     }
496
497     @Test
498     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
499         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
500
501         setupNewFollower();
502         RaftActorContext initialActorContext = new MockRaftActorContext();
503
504         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
505                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
506                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
507                 actorFactory.generateActorId(LEADER_ID));
508
509         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
510         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
511         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
512
513         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
514
515         // Drop the commit message so the snapshot doesn't complete yet.
516         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
517
518         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
519
520         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
521
522         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
523
524         // Change the leader behavior to follower
525         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
526
527         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
528         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
529         // isCapturing assertion below.
530         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
531
532         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
533         leaderActor.tell(commitMsg, leaderActor);
534
535         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
536
537         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
538         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
539
540         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
541         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
542
543         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
544     }
545
546     @Test
547     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
548         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
549
550         setupNewFollower();
551         RaftActorContext initialActorContext = new MockRaftActorContext();
552
553         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
554                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
555                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
556                 actorFactory.generateActorId(LEADER_ID));
557
558         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
559         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
560
561         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
562
563         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
564
565         // Drop the UnInitializedFollowerSnapshotReply to delay it.
566         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
567
568         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
569
570         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
571                 UnInitializedFollowerSnapshotReply.class);
572
573         // Prevent election timeout when the leader switches to follower
574         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
575
576         // Change the leader behavior to follower
577         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
578
579         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
580         leaderRaftActor.setDropMessageOfType(null);
581         leaderActor.tell(snapshotReply, leaderActor);
582
583         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
584         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
585
586         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
587
588         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
589     }
590
591     @Test
592     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
593         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
594
595         setupNewFollower();
596         RaftActorContext initialActorContext = new MockRaftActorContext();
597
598         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
599                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
600                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
601                 actorFactory.generateActorId(LEADER_ID));
602
603         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
604         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
605         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
606
607         // Drop the InstallSnapshot message so it times out
608         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
609
610         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
611
612         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
613
614         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
615         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
616
617         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
618         assertEquals("Leader followers size", 0,
619                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
620
621         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
622     }
623
624     @Test
625     public void testAddServerWithNoLeader() {
626         LOG.info("testAddServerWithNoLeader starting");
627
628         setupNewFollower();
629         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
630         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
631
632         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
633                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
634                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
635                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
636                 actorFactory.generateActorId(LEADER_ID));
637         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
638
639         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
640         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
641         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
642
643         LOG.info("testAddServerWithNoLeader ending");
644     }
645
646     @Test
647     public void testAddServerWithNoConsensusReached() {
648         LOG.info("testAddServerWithNoConsensusReached starting");
649
650         setupNewFollower();
651         RaftActorContext initialActorContext = new MockRaftActorContext();
652
653         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
654                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
655                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
656                 actorFactory.generateActorId(LEADER_ID));
657
658         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
659         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
660
661         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
662
663         // Drop UnInitializedFollowerSnapshotReply initially
664         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
665
666         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
667         TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
668                 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
669
670         // Drop AppendEntries to the new follower so consensus isn't reached
671         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
672
673         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
674
675         // Capture the UnInitializedFollowerSnapshotReply
676         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
677
678         // Send the UnInitializedFollowerSnapshotReply to resume the first request
679         leaderRaftActor.setDropMessageOfType(null);
680         leaderActor.tell(snapshotReply, leaderActor);
681
682         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
683
684         // Send a second AddServer
685         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
686
687         // The first AddServer should succeed with OK even though consensus wasn't reached
688         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
689         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
690         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
691
692         // Verify ServerConfigurationPayload entry in leader's log
693         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
694                 votingServer(NEW_SERVER_ID));
695
696         // The second AddServer should fail since consensus wasn't reached for the first
697         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
698         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
699
700         // Re-send the second AddServer - should also fail
701         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
702         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
703         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
704
705         LOG.info("testAddServerWithNoConsensusReached ending");
706     }
707
708     @Test
709     public void testAddServerWithExistingServer() {
710         LOG.info("testAddServerWithExistingServer starting");
711
712         RaftActorContext initialActorContext = new MockRaftActorContext();
713
714         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
715                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
716                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
717                 actorFactory.generateActorId(LEADER_ID));
718
719         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
720
721         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
722         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
723
724         LOG.info("testAddServerWithExistingServer ending");
725     }
726
727     @Test
728     public void testAddServerForwardedToLeader() {
729         LOG.info("testAddServerForwardedToLeader starting");
730
731         setupNewFollower();
732         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
733         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
734
735         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
736                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
737                 actorFactory.generateActorId(LEADER_ID));
738
739         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
740                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
741                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
742                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
743                 actorFactory.generateActorId(FOLLOWER_ID));
744         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
745
746         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
747                 -1, -1, (short)0), leaderActor);
748
749         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
750         expectFirstMatching(leaderActor, AddServer.class);
751
752         LOG.info("testAddServerForwardedToLeader ending");
753     }
754
755     @Test
756     public void testOnApplyState() {
757         LOG.info("testOnApplyState starting");
758
759         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
760         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
761         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
762                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
763                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
764                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
765                 actorFactory.generateActorId(LEADER_ID));
766
767         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
768
769         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
770                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
771         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
772         assertEquals("Message handled", true, handled);
773
774         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
775                 new MockRaftActorContext.MockPayload("1"));
776         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
777         assertEquals("Message handled", false, handled);
778
779         LOG.info("testOnApplyState ending");
780     }
781
782     @Test
783     public void testRemoveServerWithNoLeader() {
784         LOG.info("testRemoveServerWithNoLeader starting");
785
786         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
787         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
788
789         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
790                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
791                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
792                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
793                 actorFactory.generateActorId(LEADER_ID));
794         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
795
796         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
797         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
798         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
799
800         LOG.info("testRemoveServerWithNoLeader ending");
801     }
802
803     @Test
804     public void testRemoveServerNonExistentServer() {
805         LOG.info("testRemoveServerNonExistentServer starting");
806
807         RaftActorContext initialActorContext = new MockRaftActorContext();
808
809         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
810                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
811                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
812                 actorFactory.generateActorId(LEADER_ID));
813
814         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
815         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
816         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
817
818         LOG.info("testRemoveServerNonExistentServer ending");
819     }
820
821     @Test
822     public void testRemoveServerForwardToLeader() {
823         LOG.info("testRemoveServerForwardToLeader starting");
824
825         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
826         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
827
828         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
829                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
830                 actorFactory.generateActorId(LEADER_ID));
831
832         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
833                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
834                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
835                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
836                 actorFactory.generateActorId(FOLLOWER_ID));
837         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
838
839         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
840                 -1, -1, (short)0), leaderActor);
841
842         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
843         expectFirstMatching(leaderActor, RemoveServer.class);
844
845         LOG.info("testRemoveServerForwardToLeader ending");
846     }
847
848     @Test
849     public void testRemoveServer() {
850         LOG.info("testRemoveServer starting");
851
852         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
853         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
854         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
855
856         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
857         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
858         RaftActorContext initialActorContext = new MockRaftActorContext();
859
860         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
861                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
862                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
863                 actorFactory.generateActorId(LEADER_ID));
864
865         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
866
867         TestActorRef<MessageCollectorActor> collector =
868                 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
869                         actorFactory.generateActorId("collector"));
870         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
871                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
872                         configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
873                 followerActorId);
874
875         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
876         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
877         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
878
879         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
880         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
881         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
882
883         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
884         assertTrue("Expected Leader", currentBehavior instanceof Leader);
885         assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
886
887         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
888
889         LOG.info("testRemoveServer ending");
890     }
891
892     @Test
893     public void testRemoveServerLeader() {
894         LOG.info("testRemoveServerLeader starting");
895
896         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
897         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
898         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
899
900         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
901         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
902         RaftActorContext initialActorContext = new MockRaftActorContext();
903
904         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
905                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
906                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
907                 actorFactory.generateActorId(LEADER_ID));
908
909         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
910
911         TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
912                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
913         actorFactory.createTestActor(
914                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
915                         configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
916                 followerActorId);
917
918         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
919         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
920         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
921
922         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
923         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
924         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
925                 votingServer(FOLLOWER_ID));
926
927         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
928
929         LOG.info("testRemoveServerLeader ending");
930     }
931
932     @Test
933     public void testRemoveServerLeaderWithNoFollowers() {
934         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
935
936         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
937                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
938                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
939                 actorFactory.generateActorId(LEADER_ID));
940
941         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
942         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
943         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
944
945         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
946     }
947
948     @Test
949     public void testChangeServersVotingStatus() {
950         LOG.info("testChangeServersVotingStatus starting");
951
952         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
953         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
954         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
955
956         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
957         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
958         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
959         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
960
961         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
962                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
963                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
964                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
965         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
966
967         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
968                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
969                 actorFactory.generateActorId("collector"));
970         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
971                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
972                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
973                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
974
975         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
976                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
977                 actorFactory.generateActorId("collector"));
978         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
979                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
980                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
981                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
982
983         // Send first ChangeServersVotingStatus message
984
985         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
986                 testKit.getRef());
987         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
988         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
989
990         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
991         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
992         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
993                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
994
995         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
996         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
997                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
998
999         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1000         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1001                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1002
1003         MessageCollectorActor.clearMessages(leaderCollector);
1004         MessageCollectorActor.clearMessages(follower1Collector);
1005         MessageCollectorActor.clearMessages(follower2Collector);
1006
1007         // Send second ChangeServersVotingStatus message
1008
1009         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1010         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1011         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1012
1013         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1014         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1015                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1016
1017         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1018         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1019                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1020
1021         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1022         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1023                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1024
1025         LOG.info("testChangeServersVotingStatus ending");
1026     }
1027
1028     @Test
1029     public void testChangeLeaderToNonVoting() {
1030         LOG.info("testChangeLeaderToNonVoting starting");
1031
1032         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1033         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1034
1035         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1036         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1037         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1038         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1039
1040         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1041                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1042                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1043                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1044         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1045
1046         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1047                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1048                 actorFactory.generateActorId("collector"));
1049         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1050                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1051                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1052                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1053
1054         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1055                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1056                 actorFactory.generateActorId("collector"));
1057         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1058                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1059                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1060                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1061
1062         // Send ChangeServersVotingStatus message
1063
1064         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1065         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1066         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1067
1068         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1069         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1070                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1071
1072         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1073         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1074                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1075
1076         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1077         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1078                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1079
1080         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1081         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1082
1083         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1084
1085         LOG.info("testChangeLeaderToNonVoting ending");
1086     }
1087
1088     @Test
1089     public void testChangeLeaderToNonVotingInSingleNode() {
1090         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1091
1092         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1093                 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
1094                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1095
1096         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1097         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1098         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1099
1100         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1101     }
1102
1103     @Test
1104     public void testChangeToVotingWithNoLeader() {
1105         LOG.info("testChangeToVotingWithNoLeader starting");
1106
1107         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1108         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1109
1110         final String node1ID = "node1";
1111         final String node2ID = "node2";
1112
1113         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1114         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1115         // actors created).
1116
1117         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1118                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1119                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1120         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1121
1122         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1123         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1124         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1125         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1126
1127         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1128                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1129                 actorFactory.generateActorId("collector"));
1130         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1131                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1132                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1133         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1134
1135         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1136                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1137                 actorFactory.generateActorId("collector"));
1138         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1139                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1140                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1141         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1142
1143         // Wait for snapshot after recovery
1144         MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1145
1146         // Verify the intended server config was loaded and applied.
1147         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1148                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1149                 votingServer("downNode2"));
1150         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1151         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1152         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1153
1154         MessageCollectorActor.expectFirstMatching(node2Collector, SnapshotComplete.class);
1155         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1156
1157         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1158         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1159         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1160         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1161
1162         // First send the message such that node1 has no peer address for node2 - should fail.
1163
1164         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1165                 node2ID, true, "downNode1", false, "downNode2", false));
1166         node1RaftActorRef.tell(changeServers, testKit.getRef());
1167         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1168         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1169
1170         // Update node2's peer address and send the message again
1171
1172         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1173
1174         node1RaftActorRef.tell(changeServers, testKit.getRef());
1175         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1176         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1177
1178         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1179         assertEquals("getToIndex", 1, apply.getToIndex());
1180         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1181                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1182                 nonVotingServer("downNode2"));
1183         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1184         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1185
1186         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1187         assertEquals("getToIndex", 1, apply.getToIndex());
1188         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1189                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1190                 nonVotingServer("downNode2"));
1191         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1192         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1193
1194         LOG.info("testChangeToVotingWithNoLeader ending");
1195     }
1196
1197     @Test
1198     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1199         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1200
1201         final String node1ID = "node1";
1202         final String node2ID = "node2";
1203
1204         PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1205             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1206
1207         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1208                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1209         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1210
1211         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1212         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1213         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1214         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1215
1216         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1217         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1218         configParams1.setElectionTimeoutFactor(1);
1219         configParams1.setPeerAddressResolver(peerAddressResolver);
1220         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1221                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1222                 actorFactory.generateActorId("collector"));
1223         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1224                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1225                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1226         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1227
1228         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1229         configParams2.setElectionTimeoutFactor(1000000);
1230         configParams2.setPeerAddressResolver(peerAddressResolver);
1231         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1232                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1233                 actorFactory.generateActorId("collector"));
1234         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1235                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1236                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1237         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1238
1239         // Wait for snapshot after recovery
1240         MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1241
1242         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1243         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1244         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1245         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1246         // node2 was previously voting.
1247
1248         node2RaftActor.setDropMessageOfType(RequestVote.class);
1249
1250         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1251         node1RaftActorRef.tell(changeServers, testKit.getRef());
1252         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1253         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1254
1255         assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1256                 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1257         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1258
1259         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1260     }
1261
1262     @Test
1263     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1264         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1265
1266         final String node1ID = "node1";
1267         final String node2ID = "node2";
1268
1269         PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1270             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1271
1272         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1273         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1274         configParams.setElectionTimeoutFactor(3);
1275         configParams.setPeerAddressResolver(peerAddressResolver);
1276
1277         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1278                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1279         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1280
1281         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1282         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1283         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1284         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1285         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1286                 new MockRaftActorContext.MockPayload("2")));
1287
1288         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1289                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1290                 actorFactory.generateActorId("collector"));
1291         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1292                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1293                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1294         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1295
1296         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1297                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1298                 actorFactory.generateActorId("collector"));
1299         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1300                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1301                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1302         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1303
1304         // Wait for snapshot after recovery
1305         MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1306
1307         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1308         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1309         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1310         // forward the request to node2.
1311
1312         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1313                 ImmutableMap.of(node1ID, true, node2ID, true));
1314         node1RaftActorRef.tell(changeServers, testKit.getRef());
1315         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1316         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1317
1318         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1319         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1320                 votingServer(node1ID), votingServer(node2ID));
1321         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1322
1323         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1324         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1325                 votingServer(node1ID), votingServer(node2ID));
1326         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1327         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1328
1329         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1330     }
1331
1332     @Test
1333     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1334         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1335
1336         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1337         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1338         configParams.setElectionTimeoutFactor(100000);
1339
1340         final String node1ID = "node1";
1341         final String node2ID = "node2";
1342
1343         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1344             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
1345
1346         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1347                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1348         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1349
1350         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1351         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1352         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1353         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1354
1355         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1356                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1357                 actorFactory.generateActorId("collector"));
1358         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1359                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1360                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1361         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1362
1363         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1364                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1365                 actorFactory.generateActorId("collector"));
1366         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1367                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1368                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1369         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1370
1371         // Wait for snapshot after recovery
1372         MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1373
1374         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1375         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1376         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1377         // request to node2 when node2 is elected.
1378
1379         node2RaftActor.setDropMessageOfType(RequestVote.class);
1380
1381         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1382                 node2ID, true));
1383         node1RaftActorRef.tell(changeServers, testKit.getRef());
1384
1385         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1386
1387         node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
1388
1389         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1390         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1391
1392         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1393         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1394                 votingServer(node1ID), votingServer(node2ID));
1395         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1396         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1397
1398         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1399         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1400                 votingServer(node1ID), votingServer(node2ID));
1401         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1402
1403         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1404     }
1405
1406     private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1407         Stopwatch sw = Stopwatch.createStarted();
1408         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1409             for(RaftActor raftActor: raftActors) {
1410                 if(raftActor.getRaftState() == expState) {
1411                     return;
1412                 }
1413             }
1414         }
1415
1416         fail("None of the RaftActors have state " + expState);
1417     }
1418
1419     private static ServerInfo votingServer(String id) {
1420         return new ServerInfo(id, true);
1421     }
1422
1423     private static ServerInfo nonVotingServer(String id) {
1424         return new ServerInfo(id, false);
1425     }
1426
1427     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1428         return newCollectorActor(leaderRaftActor, LEADER_ID);
1429     }
1430
1431     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1432         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1433                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1434                 actorFactory.generateActorId(id + "Collector"));
1435         raftActor.setCollectorActor(collectorActor);
1436         return collectorActor;
1437     }
1438
1439     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1440         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1441         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1442         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1443         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1444     }
1445
1446     private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1447         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1448         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1449         configParams.setElectionTimeoutFactor(100000);
1450         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1451         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1452         termInfo.update(1, LEADER_ID);
1453         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1454                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1455     }
1456
1457     static abstract class AbstractMockRaftActor extends MockRaftActor {
1458         private volatile TestActorRef<MessageCollectorActor> collectorActor;
1459         private volatile Class<?> dropMessageOfType;
1460
1461         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1462                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1463             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1464                     persistent(Optional.of(persistent)));
1465             this.collectorActor = collectorActor;
1466         }
1467
1468         void setDropMessageOfType(Class<?> dropMessageOfType) {
1469             this.dropMessageOfType = dropMessageOfType;
1470         }
1471
1472         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1473             this.collectorActor = collectorActor;
1474         }
1475
1476         @Override
1477         public void handleCommand(Object message) {
1478             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1479                 super.handleCommand(message);
1480             }
1481
1482             if(collectorActor != null) {
1483                 collectorActor.tell(message, getSender());
1484             }
1485         }
1486     }
1487
1488     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1489
1490         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1491                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1492             super(id, peerAddresses, config, persistent, collectorActor);
1493             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1494                 @Override
1495                 public void createSnapshot(ActorRef actorRef) {
1496                     actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1497                 }
1498
1499                 @Override
1500                 public void applySnapshot(byte[] snapshotBytes) {
1501                 }
1502             };
1503         }
1504
1505         public static Props props(final String id, final Map<String, String> peerAddresses,
1506                 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1507
1508             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1509                     persistent, collectorActor);
1510         }
1511
1512     }
1513
1514     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1515         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1516                 RaftActorContext fromContext) {
1517             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1518             setPersistence(false);
1519
1520             RaftActorContext context = getRaftActorContext();
1521             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1522                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1523                 getState().add(entry.getData());
1524                 context.getReplicatedLog().append(entry);
1525             }
1526
1527             context.setCommitIndex(fromContext.getCommitIndex());
1528             context.setLastApplied(fromContext.getLastApplied());
1529             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1530                     fromContext.getTermInformation().getVotedFor());
1531         }
1532
1533         @Override
1534         protected void initializeBehavior() {
1535             changeCurrentBehavior(new Leader(getRaftActorContext()));
1536             initializeBehaviorComplete.countDown();
1537         }
1538
1539         @Override
1540         public void createSnapshot(ActorRef actorRef) {
1541             try {
1542                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1543             } catch (Exception e) {
1544                 LOG.error("createSnapshot failed", e);
1545             }
1546         }
1547
1548         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1549             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1550             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1551             configParams.setElectionTimeoutFactor(10);
1552             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1553         }
1554     }
1555
1556     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1557         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1558             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1559             setPersistence(false);
1560         }
1561
1562         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1563             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1564         }
1565     }
1566 }