BUG 2817 - Basic implementation of RemoveServer in the Raft code
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Dispatchers;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import com.google.common.base.Optional;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.DataPersistenceProvider;
33 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
34 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
37 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
38 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
40 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
41 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
42 import org.opendaylight.controller.cluster.raft.messages.AddServer;
43 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
45 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
46 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
48 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
49 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
50 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
51 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
53 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
54 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
55 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
56 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.duration.FiniteDuration;
60
61 /**
62  * Unit tests for RaftActorServerConfigurationSupport.
63  *
64  * @author Thomas Pantelis
65  */
66 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
67     static final String LEADER_ID = "leader";
68     static final String FOLLOWER_ID = "follower";
69     static final String NEW_SERVER_ID = "new-server";
70     static final String NEW_SERVER_ID2 = "new-server2";
71     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
72     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
73
74     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
75
76     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
77             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
78             actorFactory.generateActorId(FOLLOWER_ID));
79
80     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
81     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
82     private RaftActorContext newFollowerActorContext;
83
84     private final JavaTestKit testKit = new JavaTestKit(getSystem());
85
86     @Before
87     public void setup() {
88         InMemoryJournal.clear();
89         InMemorySnapshotStore.clear();
90
91         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
92
93         newFollowerCollectorActor = actorFactory.createTestActor(
94                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
95                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
96         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
97                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
98                 actorFactory.generateActorId(NEW_SERVER_ID));
99
100         try {
101             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
102         } catch (Exception e) {
103             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
104         }
105     }
106
107     private static DefaultConfigParamsImpl newFollowerConfigParams() {
108         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
109         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
110         configParams.setElectionTimeoutFactor(100000);
111         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
112         return configParams;
113     }
114
115     @After
116     public void tearDown() throws Exception {
117         actorFactory.close();
118     }
119
120     @Test
121     public void testAddServerWithExistingFollower() throws Exception {
122         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
123         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
124                 0, 3, 1).build());
125         followerActorContext.setCommitIndex(2);
126         followerActorContext.setLastApplied(2);
127
128         Follower follower = new Follower(followerActorContext);
129         followerActor.underlyingActor().setBehavior(follower);
130
131         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
132                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
133                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
134                 actorFactory.generateActorId(LEADER_ID));
135
136         // Expect initial heartbeat from the leader.
137         expectFirstMatching(followerActor, AppendEntries.class);
138         clearMessages(followerActor);
139
140         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
141         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
142
143         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
144
145         // Leader should install snapshot - capture and verify ApplySnapshot contents
146
147         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
148         @SuppressWarnings("unchecked")
149         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
150         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
151
152         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
153         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
154         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
155
156         // Verify ServerConfigurationPayload entry in leader's log
157
158         expectFirstMatching(leaderCollectorActor, ApplyState.class);
159         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
160         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
161         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
162         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
163         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
164                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
165
166         // Verify ServerConfigurationPayload entry in both followers
167
168         expectFirstMatching(followerActor, ApplyState.class);
169         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
170         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
171                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
172
173         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
174         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
175         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
176                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
177
178         // Verify new server config was applied in both followers
179
180         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
181
182         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
183
184         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
185         expectFirstMatching(followerActor, ApplyState.class);
186
187         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
188         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
189         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
190         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
191
192         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
193         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
194         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
195         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
196         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
197         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
198
199         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
200         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
201         logEntry = persistedLogEntries.get(0);
202         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
203         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
204         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
205                 logEntry.getData().getClass());
206     }
207
208     @Test
209     public void testAddServerWithNoExistingFollower() throws Exception {
210         RaftActorContext initialActorContext = new MockRaftActorContext();
211         initialActorContext.setCommitIndex(1);
212         initialActorContext.setLastApplied(1);
213         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
214                 0, 2, 1).build());
215
216         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
217                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
218                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
219                 actorFactory.generateActorId(LEADER_ID));
220
221         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
222         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
223
224         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
225
226         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
227
228         // Leader should install snapshot - capture and verify ApplySnapshot contents
229
230         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
231         @SuppressWarnings("unchecked")
232         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
233         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
234
235         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
236         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
237         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
238
239         // Verify ServerConfigurationPayload entry in leader's log
240
241         expectFirstMatching(leaderCollectorActor, ApplyState.class);
242         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
243         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
244         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
245         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
246                 votingServer(NEW_SERVER_ID));
247
248         // Verify ServerConfigurationPayload entry in the new follower
249
250         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
251         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
252         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
253                 votingServer(NEW_SERVER_ID));
254
255         // Verify new server config was applied in the new follower
256
257         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
258     }
259
260     @Test
261     public void testAddServersAsNonVoting() throws Exception {
262         RaftActorContext initialActorContext = new MockRaftActorContext();
263
264         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
265                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
266                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
267                 actorFactory.generateActorId(LEADER_ID));
268
269         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
270         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
271
272         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
273
274         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
275
276         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
277         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
278         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
279
280         // Verify ServerConfigurationPayload entry in leader's log
281
282         expectFirstMatching(leaderCollectorActor, ApplyState.class);
283
284         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
285         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
286         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
287         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
288                 nonVotingServer(NEW_SERVER_ID));
289
290         // Verify ServerConfigurationPayload entry in the new follower
291
292         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
293         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
294         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
295                 nonVotingServer(NEW_SERVER_ID));
296
297         // Verify new server config was applied in the new follower
298
299         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
300
301         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
302
303         // Add another non-voting server.
304
305         clearMessages(leaderCollectorActor);
306
307         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
308         Follower newFollower2 = new Follower(follower2ActorContext);
309         followerActor.underlyingActor().setBehavior(newFollower2);
310
311         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
312
313         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
314         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
315         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
316
317         expectFirstMatching(leaderCollectorActor, ApplyState.class);
318         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
319         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
320         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
321         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
322                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
323     }
324
325     @Test
326     public void testAddServerWithOperationInProgress() throws Exception {
327         RaftActorContext initialActorContext = new MockRaftActorContext();
328
329         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
330                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
331                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
332                 actorFactory.generateActorId(LEADER_ID));
333
334         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
335         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
336
337         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
338
339         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
340         Follower newFollower2 = new Follower(follower2ActorContext);
341         followerActor.underlyingActor().setBehavior(newFollower2);
342
343         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
344         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
345
346         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
347
348         // Wait for leader's install snapshot and capture it
349
350         Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
351
352         // Send a second AddServer - should get queued
353         JavaTestKit testKit2 = new JavaTestKit(getSystem());
354         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
355
356         // Continue the first AddServer
357         newFollowerRaftActorInstance.setDropMessageOfType(null);
358         newFollowerRaftActor.tell(installSnapshot, leaderActor);
359
360         // Verify both complete successfully
361         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
362         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
363
364         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
365         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
366
367         // Verify ServerConfigurationPayload entries in leader's log
368
369         expectMatching(leaderCollectorActor, ApplyState.class, 2);
370         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
371         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
372         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
373         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
374                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
375
376         // Verify ServerConfigurationPayload entry in the new follower
377
378         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
379         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
380                newFollowerActorContext.getPeerIds());
381     }
382
383     @Test
384     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
385         RaftActorContext initialActorContext = new MockRaftActorContext();
386
387         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
388                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
389                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
390                 actorFactory.generateActorId(LEADER_ID));
391
392         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
393         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
394
395         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
396
397         // Drop commit message for now to delay snapshot completion
398         leaderRaftActor.setDropMessageOfType(String.class);
399
400         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
401
402         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
403
404         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
405
406         leaderRaftActor.setDropMessageOfType(null);
407         leaderActor.tell(commitMsg, leaderActor);
408
409         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
410         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
411         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
412
413         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
414
415         // Verify ServerConfigurationPayload entry in leader's log
416
417         expectFirstMatching(leaderCollectorActor, ApplyState.class);
418         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
419         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
420         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
421         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
422                 votingServer(NEW_SERVER_ID));
423     }
424
425     @Test
426     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
427         RaftActorContext initialActorContext = new MockRaftActorContext();
428
429         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
430                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
431                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
432                 actorFactory.generateActorId(LEADER_ID));
433
434         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
435         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
436
437         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
438
439         // Drop commit message so the snapshot doesn't complete.
440         leaderRaftActor.setDropMessageOfType(String.class);
441
442         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
443
444         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
445
446         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
447         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
448
449         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
450     }
451
452     @Test
453     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
454         RaftActorContext initialActorContext = new MockRaftActorContext();
455
456         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
457                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
458                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
459                 actorFactory.generateActorId(LEADER_ID));
460
461         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
462         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
463         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
464
465         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
466
467         // Drop the commit message so the snapshot doesn't complete yet.
468         leaderRaftActor.setDropMessageOfType(String.class);
469
470         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
471
472         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
473
474         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
475
476         // Change the leader behavior to follower
477         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
478
479         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
480         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
481         // isCapturing assertion below.
482         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
483
484         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
485         leaderActor.tell(commitMsg, leaderActor);
486
487         leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
488
489         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
490         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
491
492         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
493         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
494     }
495
496     @Test
497     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
498         RaftActorContext initialActorContext = new MockRaftActorContext();
499
500         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
501                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
502                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
503                 actorFactory.generateActorId(LEADER_ID));
504
505         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
506         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
507
508         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
509
510         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
511
512         // Drop the UnInitializedFollowerSnapshotReply to delay it.
513         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
514
515         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
516
517         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
518                 UnInitializedFollowerSnapshotReply.class);
519
520         // Prevent election timeout when the leader switches to follower
521         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
522
523         // Change the leader behavior to follower
524         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
525
526         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
527         leaderRaftActor.setDropMessageOfType(null);
528         leaderActor.tell(snapshotReply, leaderActor);
529
530         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
531         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
532
533         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
534     }
535
536     @Test
537     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
538         RaftActorContext initialActorContext = new MockRaftActorContext();
539
540         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
541                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
542                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
543                 actorFactory.generateActorId(LEADER_ID));
544
545         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
546         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
547         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
548
549         // Drop the InstallSnapshot message so it times out
550         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
551
552         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
553
554         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
555
556         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
557         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
558
559         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
560         assertEquals("Leader followers size", 0,
561                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
562     }
563
564     @Test
565     public void testAddServerWithNoLeader() {
566         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
567         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
568
569         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
570                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
571                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
572                 actorFactory.generateActorId(LEADER_ID));
573         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
574
575         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
576         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
577         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
578     }
579
580     @Test
581     public void testAddServerWithNoConsensusReached() {
582         RaftActorContext initialActorContext = new MockRaftActorContext();
583
584         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
585                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
586                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
587                 actorFactory.generateActorId(LEADER_ID));
588
589         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
590         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
591
592         newFollowerRaftActor.underlyingActor().setDropMessageOfType(AppendEntries.class);
593
594         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
595
596         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
597         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
598         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
599
600         // Verify ServerConfigurationPayload entry in leader's log
601
602         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
603                 votingServer(NEW_SERVER_ID));
604     }
605
606     @Test
607     public void testAddServerWithExistingServer() {
608         RaftActorContext initialActorContext = new MockRaftActorContext();
609
610         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
611                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
612                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
613                 actorFactory.generateActorId(LEADER_ID));
614
615         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
616
617         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
618         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
619     }
620
621     @Test
622     public void testAddServerForwardedToLeader() {
623         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
624         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
625
626         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
627                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
628                 actorFactory.generateActorId(LEADER_ID));
629
630         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
631                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
632                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
633                 actorFactory.generateActorId(FOLLOWER_ID));
634         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
635
636         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
637                 -1, -1, (short)0), leaderActor);
638
639         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
640         expectFirstMatching(leaderActor, AddServer.class);
641     }
642
643     @Test
644     public void testOnApplyState() {
645         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(new MockRaftActorContext());
646
647         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
648                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
649         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), null, ActorRef.noSender());
650         assertEquals("Message handled", true, handled);
651
652         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
653                 new MockRaftActorContext.MockPayload("1"));
654         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), null, ActorRef.noSender());
655         assertEquals("Message handled", false, handled);
656     }
657
658     @Test
659     public void testRemoveServerWithNoLeader() {
660         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
661         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
662
663         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
664                 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
665                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
666                 actorFactory.generateActorId(LEADER_ID));
667         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
668
669         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
670         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
671         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
672     }
673
674     @Test
675     public void testRemoveServerNonExistentServer() {
676         RaftActorContext initialActorContext = new MockRaftActorContext();
677
678         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
679                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
680                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
681                 actorFactory.generateActorId(LEADER_ID));
682
683         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
684         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
685         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
686     }
687
688     @Test
689     public void testRemoveServerSelf() {
690         RaftActorContext initialActorContext = new MockRaftActorContext();
691
692         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
693                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
694                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
695                 actorFactory.generateActorId(LEADER_ID));
696
697         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
698         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
699         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
700     }
701
702     @Test
703     public void testRemoveServerForwardToLeader() {
704         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
705         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
706         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
707
708         RaftActorContext initialActorContext = new MockRaftActorContext();
709
710         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
711                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
712                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
713                 actorFactory.generateActorId(LEADER_ID));
714
715         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
716                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
717                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
718                 actorFactory.generateActorId(FOLLOWER_ID));
719
720
721         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
722                 -1, -1, (short) 0), leaderActor);
723
724         followerRaftActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
725         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
726         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
727     }
728
729     @Test
730     public void testRemoveServer() {
731         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
732         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
733         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
734
735         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
736         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
737         RaftActorContext initialActorContext = new MockRaftActorContext();
738
739         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
740                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
741                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
742                 actorFactory.generateActorId(LEADER_ID));
743
744         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
745
746         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
747                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
748                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
749                 followerActorId);
750
751         TestActorRef<MessageCollectorActor> collector =
752                 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
753
754         followerRaftActor.underlyingActor().setCollectorActor(collector);
755
756         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
757         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
758
759         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
760
761         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
762         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
763         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
764
765         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
766     }
767
768     private ServerInfo votingServer(String id) {
769         return new ServerInfo(id, true);
770     }
771
772     private ServerInfo nonVotingServer(String id) {
773         return new ServerInfo(id, false);
774     }
775
776     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
777         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
778                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
779                 actorFactory.generateActorId(LEADER_ID + "Collector"));
780         leaderRaftActor.setCollectorActor(leaderCollectorActor);
781         return leaderCollectorActor;
782     }
783
784     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
785         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
786         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
787         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
788         assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
789     }
790
791     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
792         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
793         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
794         configParams.setElectionTimeoutFactor(100000);
795         ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
796         termInfo.update(1, LEADER_ID);
797         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
798                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
799     }
800
801     static abstract class AbstractMockRaftActor extends MockRaftActor {
802         private volatile TestActorRef<MessageCollectorActor> collectorActor;
803         private volatile Class<?> dropMessageOfType;
804
805         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
806                 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
807             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
808                     dataPersistenceProvider(dataPersistenceProvider));
809             this.collectorActor = collectorActor;
810         }
811
812         void setDropMessageOfType(Class<?> dropMessageOfType) {
813             this.dropMessageOfType = dropMessageOfType;
814         }
815
816         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
817             this.collectorActor = collectorActor;
818         }
819
820         @Override
821         public void handleCommand(Object message) {
822             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
823                 super.handleCommand(message);
824             }
825
826             if(collectorActor != null) {
827                 collectorActor.tell(message, getSender());
828             }
829         }
830     }
831
832     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
833
834         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
835             super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
836         }
837
838         public static Props props(final String id, final Map<String, String> peerAddresses,
839                                   ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
840
841             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
842         }
843
844     }
845
846     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
847         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
848                 RaftActorContext fromContext) {
849             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
850             setPersistence(false);
851
852             RaftActorContext context = getRaftActorContext();
853             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
854                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
855                 getState().add(entry.getData());
856                 context.getReplicatedLog().append(entry);
857             }
858
859             context.setCommitIndex(fromContext.getCommitIndex());
860             context.setLastApplied(fromContext.getLastApplied());
861             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
862                     fromContext.getTermInformation().getVotedFor());
863         }
864
865         @Override
866         protected void initializeBehavior() {
867             changeCurrentBehavior(new Leader(getRaftActorContext()));
868             initializeBehaviorComplete.countDown();
869         }
870
871         @Override
872         public void createSnapshot(ActorRef actorRef) {
873             try {
874                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
875             } catch (Exception e) {
876                 LOG.error("createSnapshot failed", e);
877             }
878         }
879
880         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
881             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
882             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
883             configParams.setElectionTimeoutFactor(10);
884             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
885         }
886     }
887
888     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
889         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
890             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
891             setPersistence(false);
892         }
893
894         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
895             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
896         }
897     }
898 }