2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Dispatchers;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import com.google.common.base.Optional;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import java.util.Collections;
26 import java.util.List;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.DataPersistenceProvider;
33 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
34 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
37 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
38 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
40 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
41 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
42 import org.opendaylight.controller.cluster.raft.messages.AddServer;
43 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
45 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
46 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
47 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
48 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
49 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
50 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
51 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
52 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
53 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
54 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
55 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.duration.FiniteDuration;
61 * Unit tests for RaftActorServerConfigurationSupport.
63 * @author Thomas Pantelis
65 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
66 static final String LEADER_ID = "leader";
67 static final String FOLLOWER_ID = "follower";
68 static final String NEW_SERVER_ID = "new-server";
69 static final String NEW_SERVER_ID2 = "new-server2";
70 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
71 private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
73 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
75 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
76 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
77 actorFactory.generateActorId(FOLLOWER_ID));
79 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
80 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
81 private RaftActorContext newFollowerActorContext;
83 private final JavaTestKit testKit = new JavaTestKit(getSystem());
87 InMemoryJournal.clear();
88 InMemorySnapshotStore.clear();
90 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
92 newFollowerCollectorActor = actorFactory.createTestActor(
93 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
94 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
95 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
96 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
97 actorFactory.generateActorId(NEW_SERVER_ID));
100 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
101 } catch (Exception e) {
102 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
106 private static DefaultConfigParamsImpl newFollowerConfigParams() {
107 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
108 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
109 configParams.setElectionTimeoutFactor(100000);
110 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
115 public void tearDown() throws Exception {
116 actorFactory.close();
120 public void testAddServerWithExistingFollower() throws Exception {
121 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
122 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
124 followerActorContext.setCommitIndex(2);
125 followerActorContext.setLastApplied(2);
127 Follower follower = new Follower(followerActorContext);
128 followerActor.underlyingActor().setBehavior(follower);
130 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
131 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
132 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
133 actorFactory.generateActorId(LEADER_ID));
135 // Expect initial heartbeat from the leader.
136 expectFirstMatching(followerActor, AppendEntries.class);
137 clearMessages(followerActor);
139 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
140 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
142 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
144 // Leader should install snapshot - capture and verify ApplySnapshot contents
146 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
147 @SuppressWarnings("unchecked")
148 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
149 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
151 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
152 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
153 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
155 // Verify ServerConfigurationPayload entry in leader's log
157 expectFirstMatching(leaderCollectorActor, ApplyState.class);
158 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
159 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
160 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
161 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
162 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
163 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
165 // Verify ServerConfigurationPayload entry in both followers
167 expectFirstMatching(followerActor, ApplyState.class);
168 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
169 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
170 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
172 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
173 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
174 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
175 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
177 // Verify new server config was applied in both followers
179 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
181 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
183 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
184 expectFirstMatching(followerActor, ApplyState.class);
186 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
187 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
188 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
189 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
191 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
192 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
193 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
194 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
195 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
196 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
198 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
199 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
200 logEntry = persistedLogEntries.get(0);
201 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
202 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
203 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
204 logEntry.getData().getClass());
208 public void testAddServerWithNoExistingFollower() throws Exception {
209 RaftActorContext initialActorContext = new MockRaftActorContext();
210 initialActorContext.setCommitIndex(1);
211 initialActorContext.setLastApplied(1);
212 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
215 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
216 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
217 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
218 actorFactory.generateActorId(LEADER_ID));
220 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
221 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
223 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
225 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
227 // Leader should install snapshot - capture and verify ApplySnapshot contents
229 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
230 @SuppressWarnings("unchecked")
231 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
232 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
234 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
235 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
236 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
238 // Verify ServerConfigurationPayload entry in leader's log
240 expectFirstMatching(leaderCollectorActor, ApplyState.class);
241 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
242 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
243 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
244 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
245 votingServer(NEW_SERVER_ID));
247 // Verify ServerConfigurationPayload entry in the new follower
249 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
250 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
251 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
252 votingServer(NEW_SERVER_ID));
254 // Verify new server config was applied in the new follower
256 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
260 public void testAddServersAsNonVoting() throws Exception {
261 RaftActorContext initialActorContext = new MockRaftActorContext();
263 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
264 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
265 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
266 actorFactory.generateActorId(LEADER_ID));
268 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
269 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
271 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
273 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
275 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
276 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
277 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
279 // Verify ServerConfigurationPayload entry in leader's log
281 expectFirstMatching(leaderCollectorActor, ApplyState.class);
283 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
284 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
285 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
286 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
287 nonVotingServer(NEW_SERVER_ID));
289 // Verify ServerConfigurationPayload entry in the new follower
291 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
292 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
293 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
294 nonVotingServer(NEW_SERVER_ID));
296 // Verify new server config was applied in the new follower
298 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
300 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
302 // Add another non-voting server.
304 clearMessages(leaderCollectorActor);
306 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
307 Follower newFollower2 = new Follower(follower2ActorContext);
308 followerActor.underlyingActor().setBehavior(newFollower2);
310 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
312 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
313 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
314 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
316 expectFirstMatching(leaderCollectorActor, ApplyState.class);
317 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
318 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
319 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
320 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
321 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
325 public void testAddServerWithOperationInProgress() throws Exception {
326 RaftActorContext initialActorContext = new MockRaftActorContext();
328 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
329 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
330 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
331 actorFactory.generateActorId(LEADER_ID));
333 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
334 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
336 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
338 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
339 Follower newFollower2 = new Follower(follower2ActorContext);
340 followerActor.underlyingActor().setBehavior(newFollower2);
342 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
343 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
345 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
347 // Wait for leader's install snapshot and capture it
349 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
351 // Send a second AddServer - should get queued
352 JavaTestKit testKit2 = new JavaTestKit(getSystem());
353 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
355 // Continue the first AddServer
356 newFollowerRaftActorInstance.setDropMessageOfType(null);
357 newFollowerRaftActor.tell(installSnapshot, leaderActor);
359 // Verify both complete successfully
360 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
361 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
363 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
364 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
366 // Verify ServerConfigurationPayload entries in leader's log
368 expectMatching(leaderCollectorActor, ApplyState.class, 2);
369 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
370 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
371 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
372 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
373 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
375 // Verify ServerConfigurationPayload entry in the new follower
377 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
378 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
379 newFollowerActorContext.getPeerIds());
383 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
384 RaftActorContext initialActorContext = new MockRaftActorContext();
386 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
387 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
388 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
389 actorFactory.generateActorId(LEADER_ID));
391 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
392 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
394 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
396 // Drop commit message for now to delay snapshot completion
397 leaderRaftActor.setDropMessageOfType(String.class);
399 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
401 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
403 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
405 leaderRaftActor.setDropMessageOfType(null);
406 leaderActor.tell(commitMsg, leaderActor);
408 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
409 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
410 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
412 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
414 // Verify ServerConfigurationPayload entry in leader's log
416 expectFirstMatching(leaderCollectorActor, ApplyState.class);
417 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
418 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
419 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
420 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
421 votingServer(NEW_SERVER_ID));
425 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
426 RaftActorContext initialActorContext = new MockRaftActorContext();
428 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
429 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
430 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
431 actorFactory.generateActorId(LEADER_ID));
433 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
434 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
436 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
438 // Drop commit message so the snapshot doesn't complete.
439 leaderRaftActor.setDropMessageOfType(String.class);
441 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
443 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
445 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
446 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
448 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
452 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
453 RaftActorContext initialActorContext = new MockRaftActorContext();
455 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
456 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
457 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
458 actorFactory.generateActorId(LEADER_ID));
460 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
461 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
462 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
464 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
466 // Drop the commit message so the snapshot doesn't complete yet.
467 leaderRaftActor.setDropMessageOfType(String.class);
469 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
471 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
473 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
475 // Change the leader behavior to follower
476 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
478 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
479 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
480 // isCapturing assertion below.
481 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
483 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
484 leaderActor.tell(commitMsg, leaderActor);
486 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
488 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
489 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
491 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
492 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
496 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
497 RaftActorContext initialActorContext = new MockRaftActorContext();
499 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
500 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
501 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
502 actorFactory.generateActorId(LEADER_ID));
504 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
505 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
507 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
509 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
511 // Drop the UnInitializedFollowerSnapshotReply to delay it.
512 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
514 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
516 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
517 UnInitializedFollowerSnapshotReply.class);
519 // Prevent election timeout when the leader switches to follower
520 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
522 // Change the leader behavior to follower
523 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
525 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
526 leaderRaftActor.setDropMessageOfType(null);
527 leaderActor.tell(snapshotReply, leaderActor);
529 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
530 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
532 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
536 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
537 RaftActorContext initialActorContext = new MockRaftActorContext();
539 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
540 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
541 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
542 actorFactory.generateActorId(LEADER_ID));
544 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
545 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
546 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
548 // Drop the InstallSnapshot message so it times out
549 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
551 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
553 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
555 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
556 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
558 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
559 assertEquals("Leader followers size", 0,
560 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
564 public void testAddServerWithNoLeader() {
565 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
566 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
568 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
569 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
570 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
571 actorFactory.generateActorId(LEADER_ID));
572 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
574 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
575 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
576 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
580 public void testAddServerWithNoConsensusReached() {
581 RaftActorContext initialActorContext = new MockRaftActorContext();
583 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
584 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
585 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
586 actorFactory.generateActorId(LEADER_ID));
588 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
589 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
591 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
593 // Drop UnInitializedFollowerSnapshotReply initially
594 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
596 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
597 TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
598 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
600 // Drop AppendEntries to the new follower so consensus isn't reached
601 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
603 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
605 // Capture the UnInitializedFollowerSnapshotReply
606 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
608 // Send the UnInitializedFollowerSnapshotReply to resume the first request
609 leaderRaftActor.setDropMessageOfType(null);
610 leaderActor.tell(snapshotReply, leaderActor);
612 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
614 // Send a second AddServer
615 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
617 // The first AddServer should succeed with OK even though consensus wasn't reached
618 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
619 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
620 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
622 // Verify ServerConfigurationPayload entry in leader's log
623 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
624 votingServer(NEW_SERVER_ID));
626 // The second AddServer should fail since consensus wasn't reached for the first
627 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
628 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
630 // Re-send the second AddServer - should also fail
631 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
632 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
633 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
637 public void testAddServerWithExistingServer() {
638 RaftActorContext initialActorContext = new MockRaftActorContext();
640 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
641 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
642 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
643 actorFactory.generateActorId(LEADER_ID));
645 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
647 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
648 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
652 public void testAddServerForwardedToLeader() {
653 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
654 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
656 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
657 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
658 actorFactory.generateActorId(LEADER_ID));
660 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
661 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
662 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
663 actorFactory.generateActorId(FOLLOWER_ID));
664 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
666 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
667 -1, -1, (short)0), leaderActor);
669 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
670 expectFirstMatching(leaderActor, AddServer.class);
674 public void testOnApplyState() {
675 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(new MockRaftActorContext());
677 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
678 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
679 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), null, ActorRef.noSender());
680 assertEquals("Message handled", true, handled);
682 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
683 new MockRaftActorContext.MockPayload("1"));
684 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), null, ActorRef.noSender());
685 assertEquals("Message handled", false, handled);
689 public void testRemoveServerWithNoLeader() {
690 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
691 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
693 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
694 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
695 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
696 actorFactory.generateActorId(LEADER_ID));
697 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
699 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
700 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
701 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
705 public void testRemoveServerNonExistentServer() {
706 RaftActorContext initialActorContext = new MockRaftActorContext();
708 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
709 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
710 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
711 actorFactory.generateActorId(LEADER_ID));
713 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
714 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
715 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
719 public void testRemoveServerSelf() {
720 RaftActorContext initialActorContext = new MockRaftActorContext();
722 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
723 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
724 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
725 actorFactory.generateActorId(LEADER_ID));
727 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
728 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
729 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
733 public void testRemoveServerForwardToLeader() {
734 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
735 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
736 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
738 RaftActorContext initialActorContext = new MockRaftActorContext();
740 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
741 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
742 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
743 actorFactory.generateActorId(LEADER_ID));
745 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
746 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
747 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
748 actorFactory.generateActorId(FOLLOWER_ID));
751 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
752 -1, -1, (short) 0), leaderActor);
754 followerRaftActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
755 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
756 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
760 public void testRemoveServer() {
761 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
762 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
763 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
765 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
766 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
767 RaftActorContext initialActorContext = new MockRaftActorContext();
769 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
770 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
771 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
772 actorFactory.generateActorId(LEADER_ID));
774 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
776 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
777 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
778 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
781 TestActorRef<MessageCollectorActor> collector =
782 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
784 followerRaftActor.underlyingActor().setCollectorActor(collector);
786 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
787 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
789 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
791 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
792 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
793 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
795 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
798 private ServerInfo votingServer(String id) {
799 return new ServerInfo(id, true);
802 private ServerInfo nonVotingServer(String id) {
803 return new ServerInfo(id, false);
806 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
807 return newCollectorActor(leaderRaftActor, LEADER_ID);
810 private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
811 TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
812 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
813 actorFactory.generateActorId(id + "Collector"));
814 raftActor.setCollectorActor(collectorActor);
815 return collectorActor;
818 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
819 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
820 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
821 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
822 assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
825 private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
826 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
827 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
828 configParams.setElectionTimeoutFactor(100000);
829 ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
830 termInfo.update(1, LEADER_ID);
831 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
832 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
835 static abstract class AbstractMockRaftActor extends MockRaftActor {
836 private volatile TestActorRef<MessageCollectorActor> collectorActor;
837 private volatile Class<?> dropMessageOfType;
839 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
840 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
841 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
842 dataPersistenceProvider(dataPersistenceProvider));
843 this.collectorActor = collectorActor;
846 void setDropMessageOfType(Class<?> dropMessageOfType) {
847 this.dropMessageOfType = dropMessageOfType;
850 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
851 this.collectorActor = collectorActor;
855 public void handleCommand(Object message) {
856 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
857 super.handleCommand(message);
860 if(collectorActor != null) {
861 collectorActor.tell(message, getSender());
866 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
868 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
869 super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
872 public static Props props(final String id, final Map<String, String> peerAddresses,
873 ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
875 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
880 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
881 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
882 RaftActorContext fromContext) {
883 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
884 setPersistence(false);
886 RaftActorContext context = getRaftActorContext();
887 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
888 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
889 getState().add(entry.getData());
890 context.getReplicatedLog().append(entry);
893 context.setCommitIndex(fromContext.getCommitIndex());
894 context.setLastApplied(fromContext.getLastApplied());
895 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
896 fromContext.getTermInformation().getVotedFor());
900 protected void initializeBehavior() {
901 changeCurrentBehavior(new Leader(getRaftActorContext()));
902 initializeBehaviorComplete.countDown();
906 public void createSnapshot(ActorRef actorRef) {
908 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
909 } catch (Exception e) {
910 LOG.error("createSnapshot failed", e);
914 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
915 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
916 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
917 configParams.setElectionTimeoutFactor(10);
918 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
922 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
923 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
924 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
925 setPersistence(false);
928 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
929 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);