Add voting state to ServerConfigurationPayload
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.After;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
32 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
35 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
37 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
38 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
39 import org.opendaylight.controller.cluster.raft.messages.AddServer;
40 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
44 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
47 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.duration.FiniteDuration;
52
53 /**
54  * Unit tests for RaftActorServerConfigurationSupport.
55  *
56  * @author Thomas Pantelis
57  */
58 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
59     static final String LEADER_ID = "leader";
60     static final String FOLLOWER_ID = "follower";
61     static final String NEW_SERVER_ID = "new-server";
62     static final String NEW_SERVER_ID2 = "new-server2";
63     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
64     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
65
66     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
67
68     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
69             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
70             actorFactory.generateActorId(FOLLOWER_ID));
71
72     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
73     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
74     private RaftActorContext newFollowerActorContext;
75
76     private final JavaTestKit testKit = new JavaTestKit(getSystem());
77
78     @Before
79     public void setup() {
80         InMemoryJournal.clear();
81         InMemorySnapshotStore.clear();
82
83         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
84
85         newFollowerCollectorActor = actorFactory.createTestActor(
86                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
87                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
88         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
89                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
90                 actorFactory.generateActorId(NEW_SERVER_ID));
91
92         try {
93             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
94         } catch (Exception e) {
95             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
96         }
97     }
98
99     private static DefaultConfigParamsImpl newFollowerConfigParams() {
100         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
101         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
102         configParams.setElectionTimeoutFactor(100000);
103         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
104         return configParams;
105     }
106
107     @After
108     public void tearDown() throws Exception {
109         actorFactory.close();
110     }
111
112     @Test
113     public void testAddServerWithExistingFollower() throws Exception {
114         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
115         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
116                 0, 3, 1).build());
117         followerActorContext.setCommitIndex(2);
118         followerActorContext.setLastApplied(2);
119
120         Follower follower = new Follower(followerActorContext);
121         followerActor.underlyingActor().setBehavior(follower);
122
123         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
124                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
125                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
126                 actorFactory.generateActorId(LEADER_ID));
127
128         // Expect initial heartbeat from the leader.
129         expectFirstMatching(followerActor, AppendEntries.class);
130         clearMessages(followerActor);
131
132         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
133
134         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
135
136         // Leader should install snapshot - capture and verify ApplySnapshot contents
137
138         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
139         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
140         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
141
142         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
143         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
144         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
145
146         // Verify ServerConfigurationPayload entry in leader's log
147
148         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
149         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
150         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
151         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
152         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
153                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
154
155         // Verify ServerConfigurationPayload entry in both followers
156
157         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
158         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
159                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
160
161         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
162         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
163                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
164
165         // Verify new server config was applied in both followers
166
167         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
168
169         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
170
171         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
172         expectFirstMatching(followerActor, ApplyState.class);
173
174         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
175         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
176         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
177         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
178     }
179
180     @Test
181     public void testAddServerWithNoExistingFollower() throws Exception {
182         RaftActorContext initialActorContext = new MockRaftActorContext();
183         initialActorContext.setCommitIndex(1);
184         initialActorContext.setLastApplied(1);
185         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
186                 0, 2, 1).build());
187
188         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
189                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
190                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
191                 actorFactory.generateActorId(LEADER_ID));
192
193         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
194         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
195
196         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
197
198         // Leader should install snapshot - capture and verify ApplySnapshot contents
199
200         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
201         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
202         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
203
204         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
205         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
206         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
207
208         // Verify ServerConfigurationPayload entry in leader's log
209
210         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
211         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
212         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
213         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
214                 votingServer(NEW_SERVER_ID));
215
216         // Verify ServerConfigurationPayload entry in the new follower
217
218         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
219         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
220         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
221                 votingServer(NEW_SERVER_ID));
222
223         // Verify new server config was applied in the new follower
224
225         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
226     }
227
228     @Test
229     public void testAddServersAsNonVoting() throws Exception {
230         RaftActorContext initialActorContext = new MockRaftActorContext();
231         initialActorContext.setCommitIndex(-1);
232         initialActorContext.setLastApplied(-1);
233         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
234
235         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
236                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
237                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
238                 actorFactory.generateActorId(LEADER_ID));
239
240         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
241         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
242
243         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
244
245         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
246         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
247         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
248
249         // Verify ServerConfigurationPayload entry in leader's log
250
251         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
252         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
253         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
254         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
255                 nonVotingServer(NEW_SERVER_ID));
256
257         // Verify ServerConfigurationPayload entry in the new follower
258
259         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
260         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
261         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
262                 nonVotingServer(NEW_SERVER_ID));
263
264         // Verify new server config was applied in the new follower
265
266         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
267
268         MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
269
270         // Add another non-voting server.
271
272         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
273         Follower newFollower2 = new Follower(follower2ActorContext);
274         followerActor.underlyingActor().setBehavior(newFollower2);
275
276         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
277
278         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
279         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
280         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
281
282         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
283         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
284         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
285         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
286                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
287     }
288
289     @Test
290     public void testAddServerWithOperationInProgress() throws Exception {
291         RaftActorContext initialActorContext = new MockRaftActorContext();
292         initialActorContext.setCommitIndex(-1);
293         initialActorContext.setLastApplied(-1);
294         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
295
296         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
297                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
298                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
299                 actorFactory.generateActorId(LEADER_ID));
300
301         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
302         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
303
304         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
305         Follower newFollower2 = new Follower(follower2ActorContext);
306         followerActor.underlyingActor().setBehavior(newFollower2);
307
308         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
309         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
310
311         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
312
313         // Wait for leader's install snapshot and capture it
314
315         Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
316
317         JavaTestKit testKit2 = new JavaTestKit(getSystem());
318         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
319
320         newFollowerRaftActorInstance.setDropMessageOfType(null);
321         newFollowerRaftActor.tell(installSnapshot, leaderActor);
322
323         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
324         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
325
326         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
327         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
328
329         // Verify ServerConfigurationPayload entries in leader's log
330
331         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
332         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
333         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
334         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
335                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
336
337         // Verify ServerConfigurationPayload entry in the new follower
338
339         MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
340
341         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
342                newFollowerActorContext.getPeerIds());
343     }
344
345     @Test
346     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
347         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
348
349         RaftActorContext initialActorContext = new MockRaftActorContext();
350         initialActorContext.setCommitIndex(-1);
351         initialActorContext.setLastApplied(-1);
352         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
353
354         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
355                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
356                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
357                 actorFactory.generateActorId(LEADER_ID));
358
359         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
360         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
361         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
362
363         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
364
365         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
366         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
367
368         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
369         assertEquals("Leader followers size", 0,
370                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
371     }
372
373     @Test
374     public void testAddServerWithNoLeader() {
375         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
376         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
377
378         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
379                 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
380                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
381                 actorFactory.generateActorId(LEADER_ID));
382         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
383
384         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
385         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
386         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
387     }
388
389     @Test
390     public void testAddServerForwardedToLeader() {
391         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
392         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
393
394         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
395                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
396                 actorFactory.generateActorId(LEADER_ID));
397
398         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
399                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
400                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
401                 actorFactory.generateActorId(FOLLOWER_ID));
402         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
403
404         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
405                 -1, -1, (short)0), leaderActor);
406
407         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
408         expectFirstMatching(leaderActor, AddServer.class);
409     }
410
411     private ServerInfo votingServer(String id) {
412         return new ServerInfo(id, true);
413     }
414
415     private ServerInfo nonVotingServer(String id) {
416         return new ServerInfo(id, false);
417     }
418
419     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
420         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
421         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
422         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
423         assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
424     }
425
426     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
427         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
428         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
429         configParams.setElectionTimeoutFactor(100000);
430         ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
431         termInfo.update(1, LEADER_ID);
432         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
433                 id, termInfo, -1, -1,
434                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
435         followerActorContext.setCommitIndex(-1);
436         followerActorContext.setLastApplied(-1);
437         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
438
439         return followerActorContext;
440     }
441
442     public static class MockLeaderRaftActor extends MockRaftActor {
443         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
444                 RaftActorContext fromContext) {
445             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
446
447             RaftActorContext context = getRaftActorContext();
448             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
449                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
450                 getState().add(entry.getData());
451                 context.getReplicatedLog().append(entry);
452             }
453
454             context.setCommitIndex(fromContext.getCommitIndex());
455             context.setLastApplied(fromContext.getLastApplied());
456             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
457                     fromContext.getTermInformation().getVotedFor());
458         }
459
460         @Override
461         protected void initializeBehavior() {
462             changeCurrentBehavior(new Leader(getRaftActorContext()));
463             initializeBehaviorComplete.countDown();
464         }
465
466         @Override
467         public void createSnapshot(ActorRef actorRef) {
468             try {
469                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
470             } catch (Exception e) {
471                 LOG.error("createSnapshot failed", e);
472             }
473         }
474
475         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
476             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
477             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
478             configParams.setElectionTimeoutFactor(10);
479             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
480         }
481     }
482
483     public static class MockNewFollowerRaftActor extends MockRaftActor {
484         private final TestActorRef<MessageCollectorActor> collectorActor;
485         private volatile Class<?> dropMessageOfType;
486
487         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
488             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
489             this.collectorActor = collectorActor;
490         }
491
492         void setDropMessageOfType(Class<?> dropMessageOfType) {
493             this.dropMessageOfType = dropMessageOfType;
494         }
495
496         @Override
497         public void handleCommand(Object message) {
498             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
499                 super.handleCommand(message);
500             }
501
502             collectorActor.tell(message, getSender());
503         }
504
505         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
506             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
507         }
508     }
509 }