Implement ChangeServersVotingStatus message in RafActor
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.DataPersistenceProvider;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
40 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
41 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
42 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
43 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
44 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
45 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
46 import org.opendaylight.controller.cluster.raft.messages.AddServer;
47 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
49 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
50 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
51 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
52 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
53 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
54 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
55 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
56 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
57 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
58 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
59 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
60 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
61 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.duration.FiniteDuration;
65
66 /**
67  * Unit tests for RaftActorServerConfigurationSupport.
68  *
69  * @author Thomas Pantelis
70  */
71 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
72     static final String LEADER_ID = "leader";
73     static final String FOLLOWER_ID = "follower";
74     static final String FOLLOWER_ID2 = "follower2";
75     static final String NEW_SERVER_ID = "new-server";
76     static final String NEW_SERVER_ID2 = "new-server2";
77     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
78     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
79
80     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
81
82     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
83             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
84             actorFactory.generateActorId(FOLLOWER_ID));
85
86     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
87     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
88     private RaftActorContext newFollowerActorContext;
89
90     private final JavaTestKit testKit = new JavaTestKit(getSystem());
91
92     @Before
93     public void setup() {
94         InMemoryJournal.clear();
95         InMemorySnapshotStore.clear();
96
97         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
98
99         newFollowerCollectorActor = actorFactory.createTestActor(
100                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
101                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
102         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
103                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
104                 actorFactory.generateActorId(NEW_SERVER_ID));
105
106         try {
107             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
108         } catch (Exception e) {
109             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
110         }
111     }
112
113     private static DefaultConfigParamsImpl newFollowerConfigParams() {
114         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
115         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
116         configParams.setElectionTimeoutFactor(100000);
117         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
118         return configParams;
119     }
120
121     @After
122     public void tearDown() throws Exception {
123         actorFactory.close();
124     }
125
126     @Test
127     public void testAddServerWithExistingFollower() throws Exception {
128         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
129         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
130                 0, 3, 1).build());
131         followerActorContext.setCommitIndex(2);
132         followerActorContext.setLastApplied(2);
133
134         Follower follower = new Follower(followerActorContext);
135         followerActor.underlyingActor().setBehavior(follower);
136
137         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
138                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
139                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
140                 actorFactory.generateActorId(LEADER_ID));
141
142         // Expect initial heartbeat from the leader.
143         expectFirstMatching(followerActor, AppendEntries.class);
144         clearMessages(followerActor);
145
146         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
147         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
148
149         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
150
151         // Leader should install snapshot - capture and verify ApplySnapshot contents
152
153         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
154         @SuppressWarnings("unchecked")
155         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
156         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
157
158         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
159         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
160         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
161
162         // Verify ServerConfigurationPayload entry in leader's log
163
164         expectFirstMatching(leaderCollectorActor, ApplyState.class);
165         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
166         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
167         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
168         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
169         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
170                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
171
172         // Verify ServerConfigurationPayload entry in both followers
173
174         expectFirstMatching(followerActor, ApplyState.class);
175         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
176         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
177                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
178
179         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
180         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
181         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
182                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
183
184         // Verify new server config was applied in both followers
185
186         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
187
188         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
189
190         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
191         expectFirstMatching(followerActor, ApplyState.class);
192
193         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
194         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
195         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
196         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
197
198         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
199         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
200         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
201         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
202         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
203         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
204
205         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
206         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
207         logEntry = persistedLogEntries.get(0);
208         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
209         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
210         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
211                 logEntry.getData().getClass());
212     }
213
214     @Test
215     public void testAddServerWithNoExistingFollower() throws Exception {
216         RaftActorContext initialActorContext = new MockRaftActorContext();
217         initialActorContext.setCommitIndex(1);
218         initialActorContext.setLastApplied(1);
219         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
220                 0, 2, 1).build());
221
222         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
223                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
224                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
225                 actorFactory.generateActorId(LEADER_ID));
226
227         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
228         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
229
230         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
231
232         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
233
234         // Leader should install snapshot - capture and verify ApplySnapshot contents
235
236         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
237         @SuppressWarnings("unchecked")
238         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
239         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
240
241         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
242         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
243         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
244
245         // Verify ServerConfigurationPayload entry in leader's log
246
247         expectFirstMatching(leaderCollectorActor, ApplyState.class);
248         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
249         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
250         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
251         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
252                 votingServer(NEW_SERVER_ID));
253
254         // Verify ServerConfigurationPayload entry in the new follower
255
256         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
257         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
258         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
259                 votingServer(NEW_SERVER_ID));
260
261         // Verify new server config was applied in the new follower
262
263         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
264     }
265
266     @Test
267     public void testAddServersAsNonVoting() throws Exception {
268         RaftActorContext initialActorContext = new MockRaftActorContext();
269
270         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
271                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
272                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
273                 actorFactory.generateActorId(LEADER_ID));
274
275         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
276         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
277
278         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
279
280         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
281
282         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
283         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
284         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
285
286         // Verify ServerConfigurationPayload entry in leader's log
287
288         expectFirstMatching(leaderCollectorActor, ApplyState.class);
289
290         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
291         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
292         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
293         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
294                 nonVotingServer(NEW_SERVER_ID));
295
296         // Verify ServerConfigurationPayload entry in the new follower
297
298         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
299         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
300         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
301                 nonVotingServer(NEW_SERVER_ID));
302
303         // Verify new server config was applied in the new follower
304
305         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
306
307         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
308
309         // Add another non-voting server.
310
311         clearMessages(leaderCollectorActor);
312
313         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
314         Follower newFollower2 = new Follower(follower2ActorContext);
315         followerActor.underlyingActor().setBehavior(newFollower2);
316
317         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
318
319         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
320         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
321         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
322
323         expectFirstMatching(leaderCollectorActor, ApplyState.class);
324         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
325         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
326         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
327         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
328                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
329     }
330
331     @Test
332     public void testAddServerWithOperationInProgress() throws Exception {
333         RaftActorContext initialActorContext = new MockRaftActorContext();
334
335         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
336                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
337                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
338                 actorFactory.generateActorId(LEADER_ID));
339
340         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
341         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
342
343         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
344
345         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
346         Follower newFollower2 = new Follower(follower2ActorContext);
347         followerActor.underlyingActor().setBehavior(newFollower2);
348
349         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
350         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
351
352         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
353
354         // Wait for leader's install snapshot and capture it
355
356         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
357
358         // Send a second AddServer - should get queued
359         JavaTestKit testKit2 = new JavaTestKit(getSystem());
360         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
361
362         // Continue the first AddServer
363         newFollowerRaftActorInstance.setDropMessageOfType(null);
364         newFollowerRaftActor.tell(installSnapshot, leaderActor);
365
366         // Verify both complete successfully
367         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
368         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
369
370         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
371         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
372
373         // Verify ServerConfigurationPayload entries in leader's log
374
375         expectMatching(leaderCollectorActor, ApplyState.class, 2);
376         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
377         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
378         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
379         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
380                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
381
382         // Verify ServerConfigurationPayload entry in the new follower
383
384         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
385         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
386                newFollowerActorContext.getPeerIds());
387     }
388
389     @Test
390     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
391         RaftActorContext initialActorContext = new MockRaftActorContext();
392
393         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
394                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
395                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
396                 actorFactory.generateActorId(LEADER_ID));
397
398         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
399         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
400
401         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
402
403         // Drop commit message for now to delay snapshot completion
404         leaderRaftActor.setDropMessageOfType(String.class);
405
406         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
407
408         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
409
410         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
411
412         leaderRaftActor.setDropMessageOfType(null);
413         leaderActor.tell(commitMsg, leaderActor);
414
415         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
416         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
417         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
418
419         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
420
421         // Verify ServerConfigurationPayload entry in leader's log
422
423         expectFirstMatching(leaderCollectorActor, ApplyState.class);
424         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
425         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
426         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
427         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
428                 votingServer(NEW_SERVER_ID));
429     }
430
431     @Test
432     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
433         RaftActorContext initialActorContext = new MockRaftActorContext();
434
435         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
436                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
437                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
438                 actorFactory.generateActorId(LEADER_ID));
439
440         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
441         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
442
443         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
444
445         // Drop commit message so the snapshot doesn't complete.
446         leaderRaftActor.setDropMessageOfType(String.class);
447
448         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
449
450         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
451
452         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
453         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
454
455         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
456     }
457
458     @Test
459     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
460         RaftActorContext initialActorContext = new MockRaftActorContext();
461
462         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
463                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
464                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
465                 actorFactory.generateActorId(LEADER_ID));
466
467         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
468         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
469         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
470
471         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
472
473         // Drop the commit message so the snapshot doesn't complete yet.
474         leaderRaftActor.setDropMessageOfType(String.class);
475
476         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
477
478         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
479
480         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
481
482         // Change the leader behavior to follower
483         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
484
485         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
486         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
487         // isCapturing assertion below.
488         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
489
490         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
491         leaderActor.tell(commitMsg, leaderActor);
492
493         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
494
495         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
496         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
497
498         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
499         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
500     }
501
502     @Test
503     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
504         RaftActorContext initialActorContext = new MockRaftActorContext();
505
506         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
507                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
508                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
509                 actorFactory.generateActorId(LEADER_ID));
510
511         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
512         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
513
514         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
515
516         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
517
518         // Drop the UnInitializedFollowerSnapshotReply to delay it.
519         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
520
521         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
522
523         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
524                 UnInitializedFollowerSnapshotReply.class);
525
526         // Prevent election timeout when the leader switches to follower
527         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
528
529         // Change the leader behavior to follower
530         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
531
532         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
533         leaderRaftActor.setDropMessageOfType(null);
534         leaderActor.tell(snapshotReply, leaderActor);
535
536         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
537         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
538
539         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
540     }
541
542     @Test
543     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
544         RaftActorContext initialActorContext = new MockRaftActorContext();
545
546         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
547                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
548                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
549                 actorFactory.generateActorId(LEADER_ID));
550
551         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
552         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
553         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
554
555         // Drop the InstallSnapshot message so it times out
556         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
557
558         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
559
560         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
561
562         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
563         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
564
565         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
566         assertEquals("Leader followers size", 0,
567                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
568     }
569
570     @Test
571     public void testAddServerWithNoLeader() {
572         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
573         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
574
575         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
576                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
577                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
578                 actorFactory.generateActorId(LEADER_ID));
579         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
580
581         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
582         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
583         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
584     }
585
586     @Test
587     public void testAddServerWithNoConsensusReached() {
588         RaftActorContext initialActorContext = new MockRaftActorContext();
589
590         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
591                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
592                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
593                 actorFactory.generateActorId(LEADER_ID));
594
595         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
596         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
597
598         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
599
600         // Drop UnInitializedFollowerSnapshotReply initially
601         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
602
603         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
604         TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
605                 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
606
607         // Drop AppendEntries to the new follower so consensus isn't reached
608         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
609
610         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
611
612         // Capture the UnInitializedFollowerSnapshotReply
613         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
614
615         // Send the UnInitializedFollowerSnapshotReply to resume the first request
616         leaderRaftActor.setDropMessageOfType(null);
617         leaderActor.tell(snapshotReply, leaderActor);
618
619         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
620
621         // Send a second AddServer
622         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
623
624         // The first AddServer should succeed with OK even though consensus wasn't reached
625         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
626         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
627         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
628
629         // Verify ServerConfigurationPayload entry in leader's log
630         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
631                 votingServer(NEW_SERVER_ID));
632
633         // The second AddServer should fail since consensus wasn't reached for the first
634         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
635         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
636
637         // Re-send the second AddServer - should also fail
638         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
639         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
640         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
641     }
642
643     @Test
644     public void testAddServerWithExistingServer() {
645         RaftActorContext initialActorContext = new MockRaftActorContext();
646
647         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
648                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
649                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
650                 actorFactory.generateActorId(LEADER_ID));
651
652         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
653
654         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
655         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
656     }
657
658     @Test
659     public void testAddServerForwardedToLeader() {
660         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
661         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
662
663         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
664                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
665                 actorFactory.generateActorId(LEADER_ID));
666
667         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
668                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
669                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
670                 actorFactory.generateActorId(FOLLOWER_ID));
671         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
672
673         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
674                 -1, -1, (short)0), leaderActor);
675
676         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
677         expectFirstMatching(leaderActor, AddServer.class);
678     }
679
680     @Test
681     public void testOnApplyState() {
682         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
683         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
684         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
685                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
686                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
687                 actorFactory.generateActorId(LEADER_ID));
688
689         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
690
691         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
692                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
693         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
694         assertEquals("Message handled", true, handled);
695
696         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
697                 new MockRaftActorContext.MockPayload("1"));
698         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
699         assertEquals("Message handled", false, handled);
700     }
701
702     @Test
703     public void testRemoveServerWithNoLeader() {
704         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
705         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
706
707         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
708                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
709                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
710                 actorFactory.generateActorId(LEADER_ID));
711         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
712
713         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
714         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
715         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
716     }
717
718     @Test
719     public void testRemoveServerNonExistentServer() {
720         RaftActorContext initialActorContext = new MockRaftActorContext();
721
722         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
723                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
724                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
725                 actorFactory.generateActorId(LEADER_ID));
726
727         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
728         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
729         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
730     }
731
732     @Test
733     public void testRemoveServerForwardToLeader() {
734         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
735         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
736
737         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
738                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
739                 actorFactory.generateActorId(LEADER_ID));
740
741         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
742                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
743                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
744                 actorFactory.generateActorId(FOLLOWER_ID));
745         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
746
747         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
748                 -1, -1, (short)0), leaderActor);
749
750         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
751         expectFirstMatching(leaderActor, RemoveServer.class);
752     }
753
754     @Test
755     public void testRemoveServer() {
756         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
757         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
758         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
759
760         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
761         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
762         RaftActorContext initialActorContext = new MockRaftActorContext();
763
764         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
765                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
766                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
767                 actorFactory.generateActorId(LEADER_ID));
768
769         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
770
771         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
772                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
773                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
774                 followerActorId);
775
776         TestActorRef<MessageCollectorActor> collector =
777                 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
778
779         followerRaftActor.underlyingActor().setCollectorActor(collector);
780
781         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
782         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
783         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
784
785         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
786         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
787         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
788
789         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
790         assertTrue("Expected Leader", currentBehavior instanceof Leader);
791         assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
792
793         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
794     }
795
796     @Test
797     public void testRemoveServerLeader() {
798         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
799         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
800         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
801
802         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
803         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
804         RaftActorContext initialActorContext = new MockRaftActorContext();
805
806         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
807                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
808                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
809                 actorFactory.generateActorId(LEADER_ID));
810
811         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
812
813         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
814                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
815                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
816                 followerActorId);
817
818         TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
819                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
820         followerRaftActor.underlyingActor().setCollectorActor(followerCollector);
821
822         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
823         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
824         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
825
826         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
827         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
828         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
829                 votingServer(FOLLOWER_ID));
830
831         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
832     }
833
834     @Test
835     public void testRemoveServerLeaderWithNoFollowers() {
836         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
837                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
838                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
839                 actorFactory.generateActorId(LEADER_ID));
840
841         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
842         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
843         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
844     }
845
846     @Test
847     public void testChangeServersVotingStatus() {
848         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
849         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
850         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
851
852         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
853         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
854         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
855         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
856
857         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
858                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
859                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
860                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
861         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
862
863         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
864                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
865                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE).
866                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
867         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
868                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
869                 actorFactory.generateActorId("collector"));
870         follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector);
871
872         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
873                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
874                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE).
875                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
876         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
877                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
878                 actorFactory.generateActorId("collector"));
879         follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector);
880
881         // Send first ChangeServersVotingStatus message
882
883         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
884                 testKit.getRef());
885         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
886         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
887
888         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
889         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
890         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
891                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
892
893         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
894         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
895                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
896
897         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
898         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
899                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
900
901         MessageCollectorActor.clearMessages(leaderCollector);
902         MessageCollectorActor.clearMessages(follower1Collector);
903         MessageCollectorActor.clearMessages(follower2Collector);
904
905         // Send second ChangeServersVotingStatus message
906
907         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
908         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
909         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
910
911         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
912         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
913                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
914
915         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
916         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
917                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
918
919         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
920         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
921                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
922     }
923
924     @Test
925     public void testChangeLeaderToNonVoting() {
926         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
927         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
928
929         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
930         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
931         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
932         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
933
934         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
935                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
936                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
937                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
938         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
939
940         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
941                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
942                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE).
943                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
944         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
945                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
946                 actorFactory.generateActorId("collector"));
947         follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector);
948
949         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
950                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
951                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE).
952                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
953         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
954                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
955                 actorFactory.generateActorId("collector"));
956         follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector);
957
958         // Send ChangeServersVotingStatus message
959
960         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
961         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
962         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
963
964         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
965         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
966                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
967
968         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
969         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
970                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
971
972         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
973         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
974                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
975
976         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
977         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
978
979         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
980     }
981
982     private void verifyRaftState(RaftState expState, RaftActor... raftActors) {
983         Stopwatch sw = Stopwatch.createStarted();
984         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
985             for(RaftActor raftActor: raftActors) {
986                 if(raftActor.getRaftState() == expState) {
987                     return;
988                 }
989             }
990         }
991
992         fail("None of the RaftActors have state " + expState);
993     }
994
995     private static ServerInfo votingServer(String id) {
996         return new ServerInfo(id, true);
997     }
998
999     private static ServerInfo nonVotingServer(String id) {
1000         return new ServerInfo(id, false);
1001     }
1002
1003     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1004         return newCollectorActor(leaderRaftActor, LEADER_ID);
1005     }
1006
1007     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1008         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1009                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1010                 actorFactory.generateActorId(id + "Collector"));
1011         raftActor.setCollectorActor(collectorActor);
1012         return collectorActor;
1013     }
1014
1015     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1016         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1017         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1018         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1019         assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1020     }
1021
1022     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1023         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1024         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1025         configParams.setElectionTimeoutFactor(100000);
1026         ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
1027         termInfo.update(1, LEADER_ID);
1028         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1029                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
1030     }
1031
1032     static abstract class AbstractMockRaftActor extends MockRaftActor {
1033         private volatile TestActorRef<MessageCollectorActor> collectorActor;
1034         private volatile Class<?> dropMessageOfType;
1035
1036         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1037                 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
1038             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1039                     dataPersistenceProvider(dataPersistenceProvider));
1040             this.collectorActor = collectorActor;
1041         }
1042
1043         void setDropMessageOfType(Class<?> dropMessageOfType) {
1044             this.dropMessageOfType = dropMessageOfType;
1045         }
1046
1047         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1048             this.collectorActor = collectorActor;
1049         }
1050
1051         @Override
1052         public void handleCommand(Object message) {
1053             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1054                 super.handleCommand(message);
1055             }
1056
1057             if(collectorActor != null) {
1058                 collectorActor.tell(message, getSender());
1059             }
1060         }
1061     }
1062
1063     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1064
1065         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
1066             super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
1067         }
1068
1069         public static Props props(final String id, final Map<String, String> peerAddresses,
1070                                   ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
1071
1072             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
1073         }
1074
1075     }
1076
1077     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1078         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1079                 RaftActorContext fromContext) {
1080             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1081             setPersistence(false);
1082
1083             RaftActorContext context = getRaftActorContext();
1084             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1085                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1086                 getState().add(entry.getData());
1087                 context.getReplicatedLog().append(entry);
1088             }
1089
1090             context.setCommitIndex(fromContext.getCommitIndex());
1091             context.setLastApplied(fromContext.getLastApplied());
1092             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1093                     fromContext.getTermInformation().getVotedFor());
1094         }
1095
1096         @Override
1097         protected void initializeBehavior() {
1098             changeCurrentBehavior(new Leader(getRaftActorContext()));
1099             initializeBehaviorComplete.countDown();
1100         }
1101
1102         @Override
1103         public void createSnapshot(ActorRef actorRef) {
1104             try {
1105                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1106             } catch (Exception e) {
1107                 LOG.error("createSnapshot failed", e);
1108             }
1109         }
1110
1111         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1112             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1113             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1114             configParams.setElectionTimeoutFactor(10);
1115             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1116         }
1117     }
1118
1119     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1120         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1121             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
1122             setPersistence(false);
1123         }
1124
1125         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1126             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1127         }
1128     }
1129 }