Remove the leader's FollowerLogInformation on RemoveServer
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.UntypedActor;
19 import akka.dispatch.Dispatchers;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.DataPersistenceProvider;
34 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
35 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
38 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
39 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
40 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
41 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
42 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
43 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
44 import org.opendaylight.controller.cluster.raft.messages.AddServer;
45 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
47 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
49 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
50 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
51 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
52 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
53 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
54 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
55 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
56 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
57 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.duration.FiniteDuration;
61
62 /**
63  * Unit tests for RaftActorServerConfigurationSupport.
64  *
65  * @author Thomas Pantelis
66  */
67 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
68     static final String LEADER_ID = "leader";
69     static final String FOLLOWER_ID = "follower";
70     static final String NEW_SERVER_ID = "new-server";
71     static final String NEW_SERVER_ID2 = "new-server2";
72     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
73     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
74
75     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
76
77     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
78             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
79             actorFactory.generateActorId(FOLLOWER_ID));
80
81     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
82     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
83     private RaftActorContext newFollowerActorContext;
84
85     private final JavaTestKit testKit = new JavaTestKit(getSystem());
86
87     @Before
88     public void setup() {
89         InMemoryJournal.clear();
90         InMemorySnapshotStore.clear();
91
92         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
93
94         newFollowerCollectorActor = actorFactory.createTestActor(
95                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
96                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
97         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
98                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
99                 actorFactory.generateActorId(NEW_SERVER_ID));
100
101         try {
102             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
103         } catch (Exception e) {
104             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
105         }
106     }
107
108     private static DefaultConfigParamsImpl newFollowerConfigParams() {
109         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
110         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
111         configParams.setElectionTimeoutFactor(100000);
112         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
113         return configParams;
114     }
115
116     @After
117     public void tearDown() throws Exception {
118         actorFactory.close();
119     }
120
121     @Test
122     public void testAddServerWithExistingFollower() throws Exception {
123         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
124         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
125                 0, 3, 1).build());
126         followerActorContext.setCommitIndex(2);
127         followerActorContext.setLastApplied(2);
128
129         Follower follower = new Follower(followerActorContext);
130         followerActor.underlyingActor().setBehavior(follower);
131
132         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
133                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
134                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
135                 actorFactory.generateActorId(LEADER_ID));
136
137         // Expect initial heartbeat from the leader.
138         expectFirstMatching(followerActor, AppendEntries.class);
139         clearMessages(followerActor);
140
141         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
142         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
143
144         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
145
146         // Leader should install snapshot - capture and verify ApplySnapshot contents
147
148         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
149         @SuppressWarnings("unchecked")
150         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
151         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
152
153         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
154         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
155         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
156
157         // Verify ServerConfigurationPayload entry in leader's log
158
159         expectFirstMatching(leaderCollectorActor, ApplyState.class);
160         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
161         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
162         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
163         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
164         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
165                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
166
167         // Verify ServerConfigurationPayload entry in both followers
168
169         expectFirstMatching(followerActor, ApplyState.class);
170         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
171         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
172                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
173
174         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
175         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
176         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
177                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
178
179         // Verify new server config was applied in both followers
180
181         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
182
183         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
184
185         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
186         expectFirstMatching(followerActor, ApplyState.class);
187
188         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
189         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
190         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
191         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
192
193         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
194         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
195         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
196         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
197         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
198         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
199
200         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
201         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
202         logEntry = persistedLogEntries.get(0);
203         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
204         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
205         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
206                 logEntry.getData().getClass());
207     }
208
209     @Test
210     public void testAddServerWithNoExistingFollower() throws Exception {
211         RaftActorContext initialActorContext = new MockRaftActorContext();
212         initialActorContext.setCommitIndex(1);
213         initialActorContext.setLastApplied(1);
214         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
215                 0, 2, 1).build());
216
217         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
218                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
219                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
220                 actorFactory.generateActorId(LEADER_ID));
221
222         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
223         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
224
225         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
226
227         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
228
229         // Leader should install snapshot - capture and verify ApplySnapshot contents
230
231         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
232         @SuppressWarnings("unchecked")
233         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
234         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
235
236         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
237         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
238         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
239
240         // Verify ServerConfigurationPayload entry in leader's log
241
242         expectFirstMatching(leaderCollectorActor, ApplyState.class);
243         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
244         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
245         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
246         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
247                 votingServer(NEW_SERVER_ID));
248
249         // Verify ServerConfigurationPayload entry in the new follower
250
251         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
252         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
253         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
254                 votingServer(NEW_SERVER_ID));
255
256         // Verify new server config was applied in the new follower
257
258         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
259     }
260
261     @Test
262     public void testAddServersAsNonVoting() throws Exception {
263         RaftActorContext initialActorContext = new MockRaftActorContext();
264
265         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
266                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
267                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
268                 actorFactory.generateActorId(LEADER_ID));
269
270         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
271         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
272
273         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
274
275         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
276
277         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
278         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
279         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
280
281         // Verify ServerConfigurationPayload entry in leader's log
282
283         expectFirstMatching(leaderCollectorActor, ApplyState.class);
284
285         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
286         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
287         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
288         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
289                 nonVotingServer(NEW_SERVER_ID));
290
291         // Verify ServerConfigurationPayload entry in the new follower
292
293         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
294         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
295         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
296                 nonVotingServer(NEW_SERVER_ID));
297
298         // Verify new server config was applied in the new follower
299
300         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
301
302         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
303
304         // Add another non-voting server.
305
306         clearMessages(leaderCollectorActor);
307
308         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
309         Follower newFollower2 = new Follower(follower2ActorContext);
310         followerActor.underlyingActor().setBehavior(newFollower2);
311
312         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
313
314         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
315         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
316         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
317
318         expectFirstMatching(leaderCollectorActor, ApplyState.class);
319         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
320         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
321         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
322         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
323                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
324     }
325
326     @Test
327     public void testAddServerWithOperationInProgress() throws Exception {
328         RaftActorContext initialActorContext = new MockRaftActorContext();
329
330         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
331                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
332                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
333                 actorFactory.generateActorId(LEADER_ID));
334
335         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
336         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
337
338         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
339
340         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
341         Follower newFollower2 = new Follower(follower2ActorContext);
342         followerActor.underlyingActor().setBehavior(newFollower2);
343
344         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
345         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
346
347         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
348
349         // Wait for leader's install snapshot and capture it
350
351         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
352
353         // Send a second AddServer - should get queued
354         JavaTestKit testKit2 = new JavaTestKit(getSystem());
355         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
356
357         // Continue the first AddServer
358         newFollowerRaftActorInstance.setDropMessageOfType(null);
359         newFollowerRaftActor.tell(installSnapshot, leaderActor);
360
361         // Verify both complete successfully
362         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
363         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
364
365         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
366         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
367
368         // Verify ServerConfigurationPayload entries in leader's log
369
370         expectMatching(leaderCollectorActor, ApplyState.class, 2);
371         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
372         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
373         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
374         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
375                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
376
377         // Verify ServerConfigurationPayload entry in the new follower
378
379         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
380         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
381                newFollowerActorContext.getPeerIds());
382     }
383
384     @Test
385     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
386         RaftActorContext initialActorContext = new MockRaftActorContext();
387
388         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
389                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
390                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
391                 actorFactory.generateActorId(LEADER_ID));
392
393         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
394         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
395
396         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
397
398         // Drop commit message for now to delay snapshot completion
399         leaderRaftActor.setDropMessageOfType(String.class);
400
401         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
402
403         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
404
405         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
406
407         leaderRaftActor.setDropMessageOfType(null);
408         leaderActor.tell(commitMsg, leaderActor);
409
410         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
411         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
412         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
413
414         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
415
416         // Verify ServerConfigurationPayload entry in leader's log
417
418         expectFirstMatching(leaderCollectorActor, ApplyState.class);
419         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
420         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
421         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
422         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
423                 votingServer(NEW_SERVER_ID));
424     }
425
426     @Test
427     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
428         RaftActorContext initialActorContext = new MockRaftActorContext();
429
430         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
431                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
432                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
433                 actorFactory.generateActorId(LEADER_ID));
434
435         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
436         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
437
438         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
439
440         // Drop commit message so the snapshot doesn't complete.
441         leaderRaftActor.setDropMessageOfType(String.class);
442
443         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
444
445         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
446
447         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
448         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
449
450         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
451     }
452
453     @Test
454     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
455         RaftActorContext initialActorContext = new MockRaftActorContext();
456
457         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
458                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
459                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
460                 actorFactory.generateActorId(LEADER_ID));
461
462         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
463         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
464         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
465
466         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
467
468         // Drop the commit message so the snapshot doesn't complete yet.
469         leaderRaftActor.setDropMessageOfType(String.class);
470
471         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
472
473         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
474
475         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
476
477         // Change the leader behavior to follower
478         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
479
480         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
481         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
482         // isCapturing assertion below.
483         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
484
485         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
486         leaderActor.tell(commitMsg, leaderActor);
487
488         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
489
490         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
491         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
492
493         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
494         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
495     }
496
497     @Test
498     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
499         RaftActorContext initialActorContext = new MockRaftActorContext();
500
501         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
502                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
503                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
504                 actorFactory.generateActorId(LEADER_ID));
505
506         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
507         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
508
509         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
510
511         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
512
513         // Drop the UnInitializedFollowerSnapshotReply to delay it.
514         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
515
516         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
517
518         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
519                 UnInitializedFollowerSnapshotReply.class);
520
521         // Prevent election timeout when the leader switches to follower
522         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
523
524         // Change the leader behavior to follower
525         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
526
527         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
528         leaderRaftActor.setDropMessageOfType(null);
529         leaderActor.tell(snapshotReply, leaderActor);
530
531         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
532         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
533
534         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
535     }
536
537     @Test
538     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
539         RaftActorContext initialActorContext = new MockRaftActorContext();
540
541         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
542                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
543                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
544                 actorFactory.generateActorId(LEADER_ID));
545
546         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
547         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
548         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
549
550         // Drop the InstallSnapshot message so it times out
551         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
552
553         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
554
555         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
556
557         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
558         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
559
560         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
561         assertEquals("Leader followers size", 0,
562                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
563     }
564
565     @Test
566     public void testAddServerWithNoLeader() {
567         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
568         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
569
570         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
571                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
572                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
573                 actorFactory.generateActorId(LEADER_ID));
574         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
575
576         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
577         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
578         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
579     }
580
581     @Test
582     public void testAddServerWithNoConsensusReached() {
583         RaftActorContext initialActorContext = new MockRaftActorContext();
584
585         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
586                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
587                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
588                 actorFactory.generateActorId(LEADER_ID));
589
590         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
591         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
592
593         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
594
595         // Drop UnInitializedFollowerSnapshotReply initially
596         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
597
598         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
599         TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
600                 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
601
602         // Drop AppendEntries to the new follower so consensus isn't reached
603         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
604
605         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
606
607         // Capture the UnInitializedFollowerSnapshotReply
608         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
609
610         // Send the UnInitializedFollowerSnapshotReply to resume the first request
611         leaderRaftActor.setDropMessageOfType(null);
612         leaderActor.tell(snapshotReply, leaderActor);
613
614         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
615
616         // Send a second AddServer
617         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
618
619         // The first AddServer should succeed with OK even though consensus wasn't reached
620         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
621         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
622         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
623
624         // Verify ServerConfigurationPayload entry in leader's log
625         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
626                 votingServer(NEW_SERVER_ID));
627
628         // The second AddServer should fail since consensus wasn't reached for the first
629         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
630         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
631
632         // Re-send the second AddServer - should also fail
633         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
634         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
635         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
636     }
637
638     @Test
639     public void testAddServerWithExistingServer() {
640         RaftActorContext initialActorContext = new MockRaftActorContext();
641
642         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
643                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
644                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
645                 actorFactory.generateActorId(LEADER_ID));
646
647         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
648
649         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
650         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
651     }
652
653     @Test
654     public void testAddServerForwardedToLeader() {
655         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
656         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
657
658         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
659                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
660                 actorFactory.generateActorId(LEADER_ID));
661
662         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
663                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
664                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
665                 actorFactory.generateActorId(FOLLOWER_ID));
666         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
667
668         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
669                 -1, -1, (short)0), leaderActor);
670
671         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
672         expectFirstMatching(leaderActor, AddServer.class);
673     }
674
675     @Test
676     public void testOnApplyState() {
677         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
678         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
679         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
680                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
681                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
682                 actorFactory.generateActorId(LEADER_ID));
683
684         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
685
686         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
687                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
688         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
689         assertEquals("Message handled", true, handled);
690
691         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
692                 new MockRaftActorContext.MockPayload("1"));
693         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
694         assertEquals("Message handled", false, handled);
695     }
696
697     @Test
698     public void testRemoveServerWithNoLeader() {
699         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
700         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
701
702         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
703                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
704                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
705                 actorFactory.generateActorId(LEADER_ID));
706         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
707
708         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
709         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
710         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
711     }
712
713     @Test
714     public void testRemoveServerNonExistentServer() {
715         RaftActorContext initialActorContext = new MockRaftActorContext();
716
717         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
718                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
719                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
720                 actorFactory.generateActorId(LEADER_ID));
721
722         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
723         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
724         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
725     }
726
727     @Test
728     public void testRemoveServerForwardToLeader() {
729         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
730         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
731
732         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
733                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
734                 actorFactory.generateActorId(LEADER_ID));
735
736         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
737                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
738                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
739                 actorFactory.generateActorId(FOLLOWER_ID));
740         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
741
742         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
743                 -1, -1, (short)0), leaderActor);
744
745         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
746         expectFirstMatching(leaderActor, RemoveServer.class);
747     }
748
749     @Test
750     public void testRemoveServer() {
751         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
752         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
753         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
754
755         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
756         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
757         RaftActorContext initialActorContext = new MockRaftActorContext();
758
759         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
760                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
761                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
762                 actorFactory.generateActorId(LEADER_ID));
763
764         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
765
766         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
767                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
768                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
769                 followerActorId);
770
771         TestActorRef<MessageCollectorActor> collector =
772                 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
773
774         followerRaftActor.underlyingActor().setCollectorActor(collector);
775
776         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
777         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
778         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
779
780         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
781         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
782         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
783
784         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
785         assertTrue("Expected Leader", currentBehavior instanceof Leader);
786         assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
787
788         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
789     }
790
791     @Test
792     public void testRemoveServerLeader() {
793         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
794         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
795         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
796
797         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
798         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
799         RaftActorContext initialActorContext = new MockRaftActorContext();
800
801         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
802                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
803                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
804                 actorFactory.generateActorId(LEADER_ID));
805
806         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
807
808         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
809                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
810                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
811                 followerActorId);
812
813         TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
814                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
815         followerRaftActor.underlyingActor().setCollectorActor(followerCollector);
816
817         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
818         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
819         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
820
821         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
822         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
823         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
824                 votingServer(FOLLOWER_ID));
825
826         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
827     }
828
829     @Test
830     public void testRemoveServerLeaderWithNoFollowers() {
831         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
832                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
833                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
834                 actorFactory.generateActorId(LEADER_ID));
835
836         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
837         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
838         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
839     }
840
841     private ServerInfo votingServer(String id) {
842         return new ServerInfo(id, true);
843     }
844
845     private ServerInfo nonVotingServer(String id) {
846         return new ServerInfo(id, false);
847     }
848
849     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
850         return newCollectorActor(leaderRaftActor, LEADER_ID);
851     }
852
853     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
854         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
855                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
856                 actorFactory.generateActorId(id + "Collector"));
857         raftActor.setCollectorActor(collectorActor);
858         return collectorActor;
859     }
860
861     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
862         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
863         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
864         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
865         assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
866     }
867
868     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
869         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
870         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
871         configParams.setElectionTimeoutFactor(100000);
872         ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
873         termInfo.update(1, LEADER_ID);
874         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
875                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
876     }
877
878     static abstract class AbstractMockRaftActor extends MockRaftActor {
879         private volatile TestActorRef<MessageCollectorActor> collectorActor;
880         private volatile Class<?> dropMessageOfType;
881
882         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
883                 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
884             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
885                     dataPersistenceProvider(dataPersistenceProvider));
886             this.collectorActor = collectorActor;
887         }
888
889         void setDropMessageOfType(Class<?> dropMessageOfType) {
890             this.dropMessageOfType = dropMessageOfType;
891         }
892
893         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
894             this.collectorActor = collectorActor;
895         }
896
897         @Override
898         public void handleCommand(Object message) {
899             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
900                 super.handleCommand(message);
901             }
902
903             if(collectorActor != null) {
904                 collectorActor.tell(message, getSender());
905             }
906         }
907     }
908
909     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
910
911         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
912             super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
913         }
914
915         public static Props props(final String id, final Map<String, String> peerAddresses,
916                                   ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
917
918             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
919         }
920
921     }
922
923     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
924         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
925                 RaftActorContext fromContext) {
926             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
927             setPersistence(false);
928
929             RaftActorContext context = getRaftActorContext();
930             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
931                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
932                 getState().add(entry.getData());
933                 context.getReplicatedLog().append(entry);
934             }
935
936             context.setCommitIndex(fromContext.getCommitIndex());
937             context.setLastApplied(fromContext.getLastApplied());
938             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
939                     fromContext.getTermInformation().getVotedFor());
940         }
941
942         @Override
943         protected void initializeBehavior() {
944             changeCurrentBehavior(new Leader(getRaftActorContext()));
945             initializeBehaviorComplete.countDown();
946         }
947
948         @Override
949         public void createSnapshot(ActorRef actorRef) {
950             try {
951                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
952             } catch (Exception e) {
953                 LOG.error("createSnapshot failed", e);
954             }
955         }
956
957         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
958             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
959             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
960             configParams.setElectionTimeoutFactor(10);
961             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
962         }
963     }
964
965     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
966         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
967             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
968             setPersistence(false);
969         }
970
971         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
972             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
973         }
974     }
975 }