d6cd98218f3964f989985dbc38416006bb70774b
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Dispatchers;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import com.google.common.base.Optional;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.DataPersistenceProvider;
33 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
34 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
37 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
38 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
40 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
41 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
42 import org.opendaylight.controller.cluster.raft.messages.AddServer;
43 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
45 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
46 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
47 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
48 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
49 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
50 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
51 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
52 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
53 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
54 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
55 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.duration.FiniteDuration;
59
60 /**
61  * Unit tests for RaftActorServerConfigurationSupport.
62  *
63  * @author Thomas Pantelis
64  */
65 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
66     static final String LEADER_ID = "leader";
67     static final String FOLLOWER_ID = "follower";
68     static final String NEW_SERVER_ID = "new-server";
69     static final String NEW_SERVER_ID2 = "new-server2";
70     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
71     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
72
73     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
74
75     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
76             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
77             actorFactory.generateActorId(FOLLOWER_ID));
78
79     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
80     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
81     private RaftActorContext newFollowerActorContext;
82
83     private final JavaTestKit testKit = new JavaTestKit(getSystem());
84
85     @Before
86     public void setup() {
87         InMemoryJournal.clear();
88         InMemorySnapshotStore.clear();
89
90         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
91
92         newFollowerCollectorActor = actorFactory.createTestActor(
93                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
94                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
95         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
96                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
97                 actorFactory.generateActorId(NEW_SERVER_ID));
98
99         try {
100             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
101         } catch (Exception e) {
102             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
103         }
104     }
105
106     private static DefaultConfigParamsImpl newFollowerConfigParams() {
107         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
108         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
109         configParams.setElectionTimeoutFactor(100000);
110         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
111         return configParams;
112     }
113
114     @After
115     public void tearDown() throws Exception {
116         actorFactory.close();
117     }
118
119     @Test
120     public void testAddServerWithExistingFollower() throws Exception {
121         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
122         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
123                 0, 3, 1).build());
124         followerActorContext.setCommitIndex(2);
125         followerActorContext.setLastApplied(2);
126
127         Follower follower = new Follower(followerActorContext);
128         followerActor.underlyingActor().setBehavior(follower);
129
130         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
131                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
132                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
133                 actorFactory.generateActorId(LEADER_ID));
134
135         // Expect initial heartbeat from the leader.
136         expectFirstMatching(followerActor, AppendEntries.class);
137         clearMessages(followerActor);
138
139         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
140         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
141
142         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
143
144         // Leader should install snapshot - capture and verify ApplySnapshot contents
145
146         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
147         @SuppressWarnings("unchecked")
148         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
149         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
150
151         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
152         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
153         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
154
155         // Verify ServerConfigurationPayload entry in leader's log
156
157         expectFirstMatching(leaderCollectorActor, ApplyState.class);
158         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
159         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
160         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
161         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
162         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
163                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
164
165         // Verify ServerConfigurationPayload entry in both followers
166
167         expectFirstMatching(followerActor, ApplyState.class);
168         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
169         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
170                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
171
172         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
173         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
174         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
175                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
176
177         // Verify new server config was applied in both followers
178
179         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
180
181         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
182
183         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
184         expectFirstMatching(followerActor, ApplyState.class);
185
186         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
187         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
188         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
189         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
190
191         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
192         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
193         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
194         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
195         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
196         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
197
198         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
199         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
200         logEntry = persistedLogEntries.get(0);
201         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
202         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
203         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
204                 logEntry.getData().getClass());
205     }
206
207     @Test
208     public void testAddServerWithNoExistingFollower() throws Exception {
209         RaftActorContext initialActorContext = new MockRaftActorContext();
210         initialActorContext.setCommitIndex(1);
211         initialActorContext.setLastApplied(1);
212         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
213                 0, 2, 1).build());
214
215         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
216                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
217                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
218                 actorFactory.generateActorId(LEADER_ID));
219
220         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
221         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
222
223         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
224
225         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
226
227         // Leader should install snapshot - capture and verify ApplySnapshot contents
228
229         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
230         @SuppressWarnings("unchecked")
231         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
232         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
233
234         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
235         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
236         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
237
238         // Verify ServerConfigurationPayload entry in leader's log
239
240         expectFirstMatching(leaderCollectorActor, ApplyState.class);
241         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
242         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
243         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
244         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
245                 votingServer(NEW_SERVER_ID));
246
247         // Verify ServerConfigurationPayload entry in the new follower
248
249         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
250         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
251         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
252                 votingServer(NEW_SERVER_ID));
253
254         // Verify new server config was applied in the new follower
255
256         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
257     }
258
259     @Test
260     public void testAddServersAsNonVoting() throws Exception {
261         RaftActorContext initialActorContext = new MockRaftActorContext();
262
263         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
264                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
265                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
266                 actorFactory.generateActorId(LEADER_ID));
267
268         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
269         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
270
271         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
272
273         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
274
275         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
276         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
277         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
278
279         // Verify ServerConfigurationPayload entry in leader's log
280
281         expectFirstMatching(leaderCollectorActor, ApplyState.class);
282
283         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
284         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
285         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
286         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
287                 nonVotingServer(NEW_SERVER_ID));
288
289         // Verify ServerConfigurationPayload entry in the new follower
290
291         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
292         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
293         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
294                 nonVotingServer(NEW_SERVER_ID));
295
296         // Verify new server config was applied in the new follower
297
298         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
299
300         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
301
302         // Add another non-voting server.
303
304         clearMessages(leaderCollectorActor);
305
306         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
307         Follower newFollower2 = new Follower(follower2ActorContext);
308         followerActor.underlyingActor().setBehavior(newFollower2);
309
310         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
311
312         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
313         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
314         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
315
316         expectFirstMatching(leaderCollectorActor, ApplyState.class);
317         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
318         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
319         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
320         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
321                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
322     }
323
324     @Test
325     public void testAddServerWithOperationInProgress() throws Exception {
326         RaftActorContext initialActorContext = new MockRaftActorContext();
327
328         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
329                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
330                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
331                 actorFactory.generateActorId(LEADER_ID));
332
333         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
334         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
335
336         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
337
338         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
339         Follower newFollower2 = new Follower(follower2ActorContext);
340         followerActor.underlyingActor().setBehavior(newFollower2);
341
342         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
343         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
344
345         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
346
347         // Wait for leader's install snapshot and capture it
348
349         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
350
351         // Send a second AddServer - should get queued
352         JavaTestKit testKit2 = new JavaTestKit(getSystem());
353         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
354
355         // Continue the first AddServer
356         newFollowerRaftActorInstance.setDropMessageOfType(null);
357         newFollowerRaftActor.tell(installSnapshot, leaderActor);
358
359         // Verify both complete successfully
360         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
361         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
362
363         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
364         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
365
366         // Verify ServerConfigurationPayload entries in leader's log
367
368         expectMatching(leaderCollectorActor, ApplyState.class, 2);
369         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
370         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
371         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
372         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
373                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
374
375         // Verify ServerConfigurationPayload entry in the new follower
376
377         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
378         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
379                newFollowerActorContext.getPeerIds());
380     }
381
382     @Test
383     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
384         RaftActorContext initialActorContext = new MockRaftActorContext();
385
386         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
387                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
388                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
389                 actorFactory.generateActorId(LEADER_ID));
390
391         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
392         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
393
394         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
395
396         // Drop commit message for now to delay snapshot completion
397         leaderRaftActor.setDropMessageOfType(String.class);
398
399         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
400
401         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
402
403         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
404
405         leaderRaftActor.setDropMessageOfType(null);
406         leaderActor.tell(commitMsg, leaderActor);
407
408         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
409         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
410         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
411
412         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
413
414         // Verify ServerConfigurationPayload entry in leader's log
415
416         expectFirstMatching(leaderCollectorActor, ApplyState.class);
417         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
418         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
419         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
420         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
421                 votingServer(NEW_SERVER_ID));
422     }
423
424     @Test
425     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
426         RaftActorContext initialActorContext = new MockRaftActorContext();
427
428         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
429                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
430                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
431                 actorFactory.generateActorId(LEADER_ID));
432
433         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
434         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
435
436         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
437
438         // Drop commit message so the snapshot doesn't complete.
439         leaderRaftActor.setDropMessageOfType(String.class);
440
441         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
442
443         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
444
445         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
446         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
447
448         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
449     }
450
451     @Test
452     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
453         RaftActorContext initialActorContext = new MockRaftActorContext();
454
455         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
456                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
457                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
458                 actorFactory.generateActorId(LEADER_ID));
459
460         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
461         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
462         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
463
464         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
465
466         // Drop the commit message so the snapshot doesn't complete yet.
467         leaderRaftActor.setDropMessageOfType(String.class);
468
469         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
470
471         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
472
473         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
474
475         // Change the leader behavior to follower
476         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
477
478         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
479         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
480         // isCapturing assertion below.
481         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
482
483         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
484         leaderActor.tell(commitMsg, leaderActor);
485
486         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
487
488         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
489         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
490
491         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
492         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
493     }
494
495     @Test
496     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
497         RaftActorContext initialActorContext = new MockRaftActorContext();
498
499         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
500                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
501                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
502                 actorFactory.generateActorId(LEADER_ID));
503
504         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
505         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
506
507         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
508
509         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
510
511         // Drop the UnInitializedFollowerSnapshotReply to delay it.
512         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
513
514         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
515
516         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
517                 UnInitializedFollowerSnapshotReply.class);
518
519         // Prevent election timeout when the leader switches to follower
520         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
521
522         // Change the leader behavior to follower
523         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
524
525         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
526         leaderRaftActor.setDropMessageOfType(null);
527         leaderActor.tell(snapshotReply, leaderActor);
528
529         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
530         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
531
532         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
533     }
534
535     @Test
536     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
537         RaftActorContext initialActorContext = new MockRaftActorContext();
538
539         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
540                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
541                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
542                 actorFactory.generateActorId(LEADER_ID));
543
544         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
545         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
546         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
547
548         // Drop the InstallSnapshot message so it times out
549         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
550
551         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
552
553         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
554
555         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
556         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
557
558         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
559         assertEquals("Leader followers size", 0,
560                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
561     }
562
563     @Test
564     public void testAddServerWithNoLeader() {
565         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
566         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
567
568         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
569                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
570                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
571                 actorFactory.generateActorId(LEADER_ID));
572         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
573
574         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
575         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
576         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
577     }
578
579     @Test
580     public void testAddServerWithNoConsensusReached() {
581         RaftActorContext initialActorContext = new MockRaftActorContext();
582
583         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
584                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
585                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
586                 actorFactory.generateActorId(LEADER_ID));
587
588         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
589         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
590
591         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
592
593         // Drop UnInitializedFollowerSnapshotReply initially
594         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
595
596         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
597         TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
598                 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
599
600         // Drop AppendEntries to the new follower so consensus isn't reached
601         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
602
603         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
604
605         // Capture the UnInitializedFollowerSnapshotReply
606         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
607
608         // Send the UnInitializedFollowerSnapshotReply to resume the first request
609         leaderRaftActor.setDropMessageOfType(null);
610         leaderActor.tell(snapshotReply, leaderActor);
611
612         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
613
614         // Send a second AddServer
615         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
616
617         // The first AddServer should succeed with OK even though consensus wasn't reached
618         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
619         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
620         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
621
622         // Verify ServerConfigurationPayload entry in leader's log
623         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
624                 votingServer(NEW_SERVER_ID));
625
626         // The second AddServer should fail since consensus wasn't reached for the first
627         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
628         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
629
630         // Re-send the second AddServer - should also fail
631         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
632         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
633         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
634     }
635
636     @Test
637     public void testAddServerWithExistingServer() {
638         RaftActorContext initialActorContext = new MockRaftActorContext();
639
640         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
641                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
642                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
643                 actorFactory.generateActorId(LEADER_ID));
644
645         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
646
647         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
648         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
649     }
650
651     @Test
652     public void testAddServerForwardedToLeader() {
653         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
654         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
655
656         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
657                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
658                 actorFactory.generateActorId(LEADER_ID));
659
660         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
661                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
662                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
663                 actorFactory.generateActorId(FOLLOWER_ID));
664         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
665
666         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
667                 -1, -1, (short)0), leaderActor);
668
669         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
670         expectFirstMatching(leaderActor, AddServer.class);
671     }
672
673     @Test
674     public void testOnApplyState() {
675         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(new MockRaftActorContext());
676
677         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
678                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
679         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), null, ActorRef.noSender());
680         assertEquals("Message handled", true, handled);
681
682         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
683                 new MockRaftActorContext.MockPayload("1"));
684         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), null, ActorRef.noSender());
685         assertEquals("Message handled", false, handled);
686     }
687
688     @Test
689     public void testRemoveServerWithNoLeader() {
690         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
691         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
692
693         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
694                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
695                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
696                 actorFactory.generateActorId(LEADER_ID));
697         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
698
699         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
700         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
701         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
702     }
703
704     @Test
705     public void testRemoveServerNonExistentServer() {
706         RaftActorContext initialActorContext = new MockRaftActorContext();
707
708         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
709                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
710                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
711                 actorFactory.generateActorId(LEADER_ID));
712
713         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
714         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
715         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
716     }
717
718     @Test
719     public void testRemoveServerSelf() {
720         RaftActorContext initialActorContext = new MockRaftActorContext();
721
722         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
723                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
724                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
725                 actorFactory.generateActorId(LEADER_ID));
726
727         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
728         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
729         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
730     }
731
732     @Test
733     public void testRemoveServerForwardToLeader() {
734         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
735         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
736         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
737
738         RaftActorContext initialActorContext = new MockRaftActorContext();
739
740         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
741                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
742                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
743                 actorFactory.generateActorId(LEADER_ID));
744
745         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
746                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
747                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
748                 actorFactory.generateActorId(FOLLOWER_ID));
749
750
751         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
752                 -1, -1, (short) 0), leaderActor);
753
754         followerRaftActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
755         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
756         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
757     }
758
759     @Test
760     public void testRemoveServer() {
761         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
762         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
763         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
764
765         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
766         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
767         RaftActorContext initialActorContext = new MockRaftActorContext();
768
769         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
770                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
771                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
772                 actorFactory.generateActorId(LEADER_ID));
773
774         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
775
776         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
777                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
778                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
779                 followerActorId);
780
781         TestActorRef<MessageCollectorActor> collector =
782                 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
783
784         followerRaftActor.underlyingActor().setCollectorActor(collector);
785
786         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
787         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
788
789         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
790
791         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
792         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
793         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
794
795         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
796     }
797
798     private ServerInfo votingServer(String id) {
799         return new ServerInfo(id, true);
800     }
801
802     private ServerInfo nonVotingServer(String id) {
803         return new ServerInfo(id, false);
804     }
805
806     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
807         return newCollectorActor(leaderRaftActor, LEADER_ID);
808     }
809
810     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
811         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
812                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
813                 actorFactory.generateActorId(id + "Collector"));
814         raftActor.setCollectorActor(collectorActor);
815         return collectorActor;
816     }
817
818     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
819         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
820         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
821         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
822         assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
823     }
824
825     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
826         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
827         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
828         configParams.setElectionTimeoutFactor(100000);
829         ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
830         termInfo.update(1, LEADER_ID);
831         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
832                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
833     }
834
835     static abstract class AbstractMockRaftActor extends MockRaftActor {
836         private volatile TestActorRef<MessageCollectorActor> collectorActor;
837         private volatile Class<?> dropMessageOfType;
838
839         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
840                 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
841             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
842                     dataPersistenceProvider(dataPersistenceProvider));
843             this.collectorActor = collectorActor;
844         }
845
846         void setDropMessageOfType(Class<?> dropMessageOfType) {
847             this.dropMessageOfType = dropMessageOfType;
848         }
849
850         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
851             this.collectorActor = collectorActor;
852         }
853
854         @Override
855         public void handleCommand(Object message) {
856             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
857                 super.handleCommand(message);
858             }
859
860             if(collectorActor != null) {
861                 collectorActor.tell(message, getSender());
862             }
863         }
864     }
865
866     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
867
868         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
869             super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
870         }
871
872         public static Props props(final String id, final Map<String, String> peerAddresses,
873                                   ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
874
875             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
876         }
877
878     }
879
880     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
881         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
882                 RaftActorContext fromContext) {
883             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
884             setPersistence(false);
885
886             RaftActorContext context = getRaftActorContext();
887             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
888                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
889                 getState().add(entry.getData());
890                 context.getReplicatedLog().append(entry);
891             }
892
893             context.setCommitIndex(fromContext.getCommitIndex());
894             context.setLastApplied(fromContext.getLastApplied());
895             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
896                     fromContext.getTermInformation().getVotedFor());
897         }
898
899         @Override
900         protected void initializeBehavior() {
901             changeCurrentBehavior(new Leader(getRaftActorContext()));
902             initializeBehaviorComplete.countDown();
903         }
904
905         @Override
906         public void createSnapshot(ActorRef actorRef) {
907             try {
908                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
909             } catch (Exception e) {
910                 LOG.error("createSnapshot failed", e);
911             }
912         }
913
914         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
915             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
916             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
917             configParams.setElectionTimeoutFactor(10);
918             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
919         }
920     }
921
922     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
923         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
924             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
925             setPersistence(false);
926         }
927
928         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
929             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
930         }
931     }
932 }