2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Dispatchers;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import com.google.common.base.Optional;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import java.util.Collections;
26 import java.util.List;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.DataPersistenceProvider;
33 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
34 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
37 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
38 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
40 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
41 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
42 import org.opendaylight.controller.cluster.raft.messages.AddServer;
43 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
45 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
46 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
48 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
49 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
50 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
51 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
53 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
54 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
55 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
56 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.duration.FiniteDuration;
62 * Unit tests for RaftActorServerConfigurationSupport.
64 * @author Thomas Pantelis
66 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
67 static final String LEADER_ID = "leader";
68 static final String FOLLOWER_ID = "follower";
69 static final String NEW_SERVER_ID = "new-server";
70 static final String NEW_SERVER_ID2 = "new-server2";
71 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
72 private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
74 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
76 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
77 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
78 actorFactory.generateActorId(FOLLOWER_ID));
80 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
81 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
82 private RaftActorContext newFollowerActorContext;
84 private final JavaTestKit testKit = new JavaTestKit(getSystem());
88 InMemoryJournal.clear();
89 InMemorySnapshotStore.clear();
91 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
93 newFollowerCollectorActor = actorFactory.createTestActor(
94 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
95 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
96 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
97 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
98 actorFactory.generateActorId(NEW_SERVER_ID));
101 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
102 } catch (Exception e) {
103 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
107 private static DefaultConfigParamsImpl newFollowerConfigParams() {
108 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
109 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
110 configParams.setElectionTimeoutFactor(100000);
111 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
116 public void tearDown() throws Exception {
117 actorFactory.close();
121 public void testAddServerWithExistingFollower() throws Exception {
122 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
123 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
125 followerActorContext.setCommitIndex(2);
126 followerActorContext.setLastApplied(2);
128 Follower follower = new Follower(followerActorContext);
129 followerActor.underlyingActor().setBehavior(follower);
131 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
132 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
133 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
134 actorFactory.generateActorId(LEADER_ID));
136 // Expect initial heartbeat from the leader.
137 expectFirstMatching(followerActor, AppendEntries.class);
138 clearMessages(followerActor);
140 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
141 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
143 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
145 // Leader should install snapshot - capture and verify ApplySnapshot contents
147 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
148 @SuppressWarnings("unchecked")
149 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
150 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
152 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
153 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
154 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
156 // Verify ServerConfigurationPayload entry in leader's log
158 expectFirstMatching(leaderCollectorActor, ApplyState.class);
159 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
160 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
161 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
162 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
163 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
164 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
166 // Verify ServerConfigurationPayload entry in both followers
168 expectFirstMatching(followerActor, ApplyState.class);
169 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
170 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
171 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
173 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
174 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
175 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
176 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
178 // Verify new server config was applied in both followers
180 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
182 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
184 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
185 expectFirstMatching(followerActor, ApplyState.class);
187 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
188 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
189 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
190 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
192 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
193 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
194 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
195 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
196 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
197 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
199 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
200 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
201 logEntry = persistedLogEntries.get(0);
202 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
203 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
204 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
205 logEntry.getData().getClass());
209 public void testAddServerWithNoExistingFollower() throws Exception {
210 RaftActorContext initialActorContext = new MockRaftActorContext();
211 initialActorContext.setCommitIndex(1);
212 initialActorContext.setLastApplied(1);
213 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
216 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
217 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
218 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
219 actorFactory.generateActorId(LEADER_ID));
221 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
222 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
224 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
226 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
228 // Leader should install snapshot - capture and verify ApplySnapshot contents
230 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
231 @SuppressWarnings("unchecked")
232 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
233 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
235 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
236 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
237 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
239 // Verify ServerConfigurationPayload entry in leader's log
241 expectFirstMatching(leaderCollectorActor, ApplyState.class);
242 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
243 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
244 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
245 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
246 votingServer(NEW_SERVER_ID));
248 // Verify ServerConfigurationPayload entry in the new follower
250 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
251 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
252 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
253 votingServer(NEW_SERVER_ID));
255 // Verify new server config was applied in the new follower
257 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
261 public void testAddServersAsNonVoting() throws Exception {
262 RaftActorContext initialActorContext = new MockRaftActorContext();
264 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
265 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
266 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
267 actorFactory.generateActorId(LEADER_ID));
269 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
270 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
272 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
274 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
276 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
277 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
278 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
280 // Verify ServerConfigurationPayload entry in leader's log
282 expectFirstMatching(leaderCollectorActor, ApplyState.class);
284 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
285 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
286 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
287 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
288 nonVotingServer(NEW_SERVER_ID));
290 // Verify ServerConfigurationPayload entry in the new follower
292 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
293 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
294 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
295 nonVotingServer(NEW_SERVER_ID));
297 // Verify new server config was applied in the new follower
299 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
301 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
303 // Add another non-voting server.
305 clearMessages(leaderCollectorActor);
307 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
308 Follower newFollower2 = new Follower(follower2ActorContext);
309 followerActor.underlyingActor().setBehavior(newFollower2);
311 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
313 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
314 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
315 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
317 expectFirstMatching(leaderCollectorActor, ApplyState.class);
318 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
319 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
320 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
321 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
322 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
326 public void testAddServerWithOperationInProgress() throws Exception {
327 RaftActorContext initialActorContext = new MockRaftActorContext();
329 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
330 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
331 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
332 actorFactory.generateActorId(LEADER_ID));
334 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
335 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
337 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
339 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
340 Follower newFollower2 = new Follower(follower2ActorContext);
341 followerActor.underlyingActor().setBehavior(newFollower2);
343 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
344 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
346 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
348 // Wait for leader's install snapshot and capture it
350 Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
352 // Send a second AddServer - should get queued
353 JavaTestKit testKit2 = new JavaTestKit(getSystem());
354 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
356 // Continue the first AddServer
357 newFollowerRaftActorInstance.setDropMessageOfType(null);
358 newFollowerRaftActor.tell(installSnapshot, leaderActor);
360 // Verify both complete successfully
361 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
362 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
364 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
365 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
367 // Verify ServerConfigurationPayload entries in leader's log
369 expectMatching(leaderCollectorActor, ApplyState.class, 2);
370 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
371 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
372 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
373 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
374 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
376 // Verify ServerConfigurationPayload entry in the new follower
378 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
379 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
380 newFollowerActorContext.getPeerIds());
384 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
385 RaftActorContext initialActorContext = new MockRaftActorContext();
387 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
388 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
389 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
390 actorFactory.generateActorId(LEADER_ID));
392 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
393 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
395 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
397 // Drop commit message for now to delay snapshot completion
398 leaderRaftActor.setDropMessageOfType(String.class);
400 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
402 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
404 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
406 leaderRaftActor.setDropMessageOfType(null);
407 leaderActor.tell(commitMsg, leaderActor);
409 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
410 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
411 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
413 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
415 // Verify ServerConfigurationPayload entry in leader's log
417 expectFirstMatching(leaderCollectorActor, ApplyState.class);
418 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
419 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
420 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
421 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
422 votingServer(NEW_SERVER_ID));
426 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
427 RaftActorContext initialActorContext = new MockRaftActorContext();
429 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
430 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
431 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
432 actorFactory.generateActorId(LEADER_ID));
434 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
435 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
437 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
439 // Drop commit message so the snapshot doesn't complete.
440 leaderRaftActor.setDropMessageOfType(String.class);
442 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
444 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
446 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
447 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
449 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
453 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
454 RaftActorContext initialActorContext = new MockRaftActorContext();
456 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
457 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
458 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
459 actorFactory.generateActorId(LEADER_ID));
461 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
462 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
463 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
465 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
467 // Drop the commit message so the snapshot doesn't complete yet.
468 leaderRaftActor.setDropMessageOfType(String.class);
470 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
472 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
474 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
476 // Change the leader behavior to follower
477 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
479 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
480 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
481 // isCapturing assertion below.
482 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
484 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
485 leaderActor.tell(commitMsg, leaderActor);
487 leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
489 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
490 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
492 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
493 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
497 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
498 RaftActorContext initialActorContext = new MockRaftActorContext();
500 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
501 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
502 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
503 actorFactory.generateActorId(LEADER_ID));
505 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
506 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
508 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
510 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
512 // Drop the UnInitializedFollowerSnapshotReply to delay it.
513 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
515 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
517 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
518 UnInitializedFollowerSnapshotReply.class);
520 // Prevent election timeout when the leader switches to follower
521 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
523 // Change the leader behavior to follower
524 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
526 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
527 leaderRaftActor.setDropMessageOfType(null);
528 leaderActor.tell(snapshotReply, leaderActor);
530 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
531 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
533 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
537 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
538 RaftActorContext initialActorContext = new MockRaftActorContext();
540 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
541 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
542 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
543 actorFactory.generateActorId(LEADER_ID));
545 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
546 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
547 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
549 // Drop the InstallSnapshot message so it times out
550 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
552 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
554 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
556 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
557 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
559 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
560 assertEquals("Leader followers size", 0,
561 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
565 public void testAddServerWithNoLeader() {
566 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
567 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
569 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
570 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
571 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
572 actorFactory.generateActorId(LEADER_ID));
573 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
575 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
576 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
577 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
581 public void testAddServerWithNoConsensusReached() {
582 RaftActorContext initialActorContext = new MockRaftActorContext();
584 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
585 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
586 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
587 actorFactory.generateActorId(LEADER_ID));
589 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
590 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
592 newFollowerRaftActor.underlyingActor().setDropMessageOfType(AppendEntries.class);
594 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
596 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
597 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
598 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
600 // Verify ServerConfigurationPayload entry in leader's log
602 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
603 votingServer(NEW_SERVER_ID));
607 public void testAddServerWithExistingServer() {
608 RaftActorContext initialActorContext = new MockRaftActorContext();
610 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
611 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
612 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
613 actorFactory.generateActorId(LEADER_ID));
615 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
617 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
618 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
622 public void testAddServerForwardedToLeader() {
623 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
624 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
626 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
627 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
628 actorFactory.generateActorId(LEADER_ID));
630 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
631 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
632 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
633 actorFactory.generateActorId(FOLLOWER_ID));
634 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
636 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
637 -1, -1, (short)0), leaderActor);
639 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
640 expectFirstMatching(leaderActor, AddServer.class);
644 public void testOnApplyState() {
645 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(new MockRaftActorContext());
647 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
648 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
649 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), null, ActorRef.noSender());
650 assertEquals("Message handled", true, handled);
652 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
653 new MockRaftActorContext.MockPayload("1"));
654 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), null, ActorRef.noSender());
655 assertEquals("Message handled", false, handled);
659 public void testRemoveServerWithNoLeader() {
660 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
661 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
663 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
664 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
665 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
666 actorFactory.generateActorId(LEADER_ID));
667 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
669 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
670 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
671 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
675 public void testRemoveServerNonExistentServer() {
676 RaftActorContext initialActorContext = new MockRaftActorContext();
678 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
679 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
680 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
681 actorFactory.generateActorId(LEADER_ID));
683 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
684 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
685 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
689 public void testRemoveServerSelf() {
690 RaftActorContext initialActorContext = new MockRaftActorContext();
692 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
693 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
694 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
695 actorFactory.generateActorId(LEADER_ID));
697 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
698 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
699 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
703 public void testRemoveServerForwardToLeader() {
704 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
705 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
706 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
708 RaftActorContext initialActorContext = new MockRaftActorContext();
710 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
711 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
712 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
713 actorFactory.generateActorId(LEADER_ID));
715 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
716 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
717 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
718 actorFactory.generateActorId(FOLLOWER_ID));
721 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
722 -1, -1, (short) 0), leaderActor);
724 followerRaftActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
725 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
726 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
730 public void testRemoveServer() {
731 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
732 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
733 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
735 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
736 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
737 RaftActorContext initialActorContext = new MockRaftActorContext();
739 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
740 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
741 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
742 actorFactory.generateActorId(LEADER_ID));
744 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
746 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
747 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
748 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
751 TestActorRef<MessageCollectorActor> collector =
752 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
754 followerRaftActor.underlyingActor().setCollectorActor(collector);
756 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
757 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
759 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
761 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
762 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
763 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
765 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
768 private ServerInfo votingServer(String id) {
769 return new ServerInfo(id, true);
772 private ServerInfo nonVotingServer(String id) {
773 return new ServerInfo(id, false);
776 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
777 TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
778 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
779 actorFactory.generateActorId(LEADER_ID + "Collector"));
780 leaderRaftActor.setCollectorActor(leaderCollectorActor);
781 return leaderCollectorActor;
784 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
785 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
786 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
787 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
788 assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
791 private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
792 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
793 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
794 configParams.setElectionTimeoutFactor(100000);
795 ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
796 termInfo.update(1, LEADER_ID);
797 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
798 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
801 static abstract class AbstractMockRaftActor extends MockRaftActor {
802 private volatile TestActorRef<MessageCollectorActor> collectorActor;
803 private volatile Class<?> dropMessageOfType;
805 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
806 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
807 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
808 dataPersistenceProvider(dataPersistenceProvider));
809 this.collectorActor = collectorActor;
812 void setDropMessageOfType(Class<?> dropMessageOfType) {
813 this.dropMessageOfType = dropMessageOfType;
816 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
817 this.collectorActor = collectorActor;
821 public void handleCommand(Object message) {
822 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
823 super.handleCommand(message);
826 if(collectorActor != null) {
827 collectorActor.tell(message, getSender());
832 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
834 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
835 super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
838 public static Props props(final String id, final Map<String, String> peerAddresses,
839 ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
841 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
846 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
847 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
848 RaftActorContext fromContext) {
849 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
850 setPersistence(false);
852 RaftActorContext context = getRaftActorContext();
853 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
854 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
855 getState().add(entry.getData());
856 context.getReplicatedLog().append(entry);
859 context.setCommitIndex(fromContext.getCommitIndex());
860 context.setLastApplied(fromContext.getLastApplied());
861 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
862 fromContext.getTermInformation().getVotedFor());
866 protected void initializeBehavior() {
867 changeCurrentBehavior(new Leader(getRaftActorContext()));
868 initializeBehaviorComplete.countDown();
872 public void createSnapshot(ActorRef actorRef) {
874 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
875 } catch (Exception e) {
876 LOG.error("createSnapshot failed", e);
880 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
881 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
882 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
883 configParams.setElectionTimeoutFactor(10);
884 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
888 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
889 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
890 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
891 setPersistence(false);
894 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
895 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);