2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.UntypedActor;
19 import akka.dispatch.Dispatchers;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import java.util.Collections;
27 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.DataPersistenceProvider;
34 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
35 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
38 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
39 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
40 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
41 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
42 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
43 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
44 import org.opendaylight.controller.cluster.raft.messages.AddServer;
45 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
47 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
49 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
50 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
51 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
52 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
53 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
54 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
55 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
56 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
57 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.duration.FiniteDuration;
63 * Unit tests for RaftActorServerConfigurationSupport.
65 * @author Thomas Pantelis
67 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
68 static final String LEADER_ID = "leader";
69 static final String FOLLOWER_ID = "follower";
70 static final String NEW_SERVER_ID = "new-server";
71 static final String NEW_SERVER_ID2 = "new-server2";
72 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
73 private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
75 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
77 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
78 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
79 actorFactory.generateActorId(FOLLOWER_ID));
81 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
82 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
83 private RaftActorContext newFollowerActorContext;
85 private final JavaTestKit testKit = new JavaTestKit(getSystem());
89 InMemoryJournal.clear();
90 InMemorySnapshotStore.clear();
92 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
94 newFollowerCollectorActor = actorFactory.createTestActor(
95 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
96 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
97 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
98 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
99 actorFactory.generateActorId(NEW_SERVER_ID));
102 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
103 } catch (Exception e) {
104 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
108 private static DefaultConfigParamsImpl newFollowerConfigParams() {
109 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
110 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
111 configParams.setElectionTimeoutFactor(100000);
112 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
117 public void tearDown() throws Exception {
118 actorFactory.close();
122 public void testAddServerWithExistingFollower() throws Exception {
123 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
124 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
126 followerActorContext.setCommitIndex(2);
127 followerActorContext.setLastApplied(2);
129 Follower follower = new Follower(followerActorContext);
130 followerActor.underlyingActor().setBehavior(follower);
132 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
133 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
134 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
135 actorFactory.generateActorId(LEADER_ID));
137 // Expect initial heartbeat from the leader.
138 expectFirstMatching(followerActor, AppendEntries.class);
139 clearMessages(followerActor);
141 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
142 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
144 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
146 // Leader should install snapshot - capture and verify ApplySnapshot contents
148 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
149 @SuppressWarnings("unchecked")
150 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
151 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
153 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
154 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
155 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
157 // Verify ServerConfigurationPayload entry in leader's log
159 expectFirstMatching(leaderCollectorActor, ApplyState.class);
160 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
161 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
162 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
163 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
164 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
165 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
167 // Verify ServerConfigurationPayload entry in both followers
169 expectFirstMatching(followerActor, ApplyState.class);
170 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
171 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
172 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
174 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
175 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
176 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
177 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
179 // Verify new server config was applied in both followers
181 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
183 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
185 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
186 expectFirstMatching(followerActor, ApplyState.class);
188 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
189 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
190 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
191 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
193 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
194 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
195 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
196 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
197 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
198 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
200 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
201 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
202 logEntry = persistedLogEntries.get(0);
203 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
204 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
205 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
206 logEntry.getData().getClass());
210 public void testAddServerWithNoExistingFollower() throws Exception {
211 RaftActorContext initialActorContext = new MockRaftActorContext();
212 initialActorContext.setCommitIndex(1);
213 initialActorContext.setLastApplied(1);
214 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
217 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
218 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
219 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
220 actorFactory.generateActorId(LEADER_ID));
222 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
223 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
225 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
227 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
229 // Leader should install snapshot - capture and verify ApplySnapshot contents
231 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
232 @SuppressWarnings("unchecked")
233 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
234 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
236 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
237 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
238 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
240 // Verify ServerConfigurationPayload entry in leader's log
242 expectFirstMatching(leaderCollectorActor, ApplyState.class);
243 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
244 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
245 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
246 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
247 votingServer(NEW_SERVER_ID));
249 // Verify ServerConfigurationPayload entry in the new follower
251 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
252 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
253 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
254 votingServer(NEW_SERVER_ID));
256 // Verify new server config was applied in the new follower
258 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
262 public void testAddServersAsNonVoting() throws Exception {
263 RaftActorContext initialActorContext = new MockRaftActorContext();
265 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
266 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
267 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
268 actorFactory.generateActorId(LEADER_ID));
270 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
271 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
273 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
275 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
277 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
278 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
279 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
281 // Verify ServerConfigurationPayload entry in leader's log
283 expectFirstMatching(leaderCollectorActor, ApplyState.class);
285 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
286 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
287 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
288 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
289 nonVotingServer(NEW_SERVER_ID));
291 // Verify ServerConfigurationPayload entry in the new follower
293 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
294 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
295 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
296 nonVotingServer(NEW_SERVER_ID));
298 // Verify new server config was applied in the new follower
300 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
302 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
304 // Add another non-voting server.
306 clearMessages(leaderCollectorActor);
308 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
309 Follower newFollower2 = new Follower(follower2ActorContext);
310 followerActor.underlyingActor().setBehavior(newFollower2);
312 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
314 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
315 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
316 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
318 expectFirstMatching(leaderCollectorActor, ApplyState.class);
319 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
320 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
321 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
322 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
323 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
327 public void testAddServerWithOperationInProgress() throws Exception {
328 RaftActorContext initialActorContext = new MockRaftActorContext();
330 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
331 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
332 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
333 actorFactory.generateActorId(LEADER_ID));
335 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
336 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
338 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
340 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
341 Follower newFollower2 = new Follower(follower2ActorContext);
342 followerActor.underlyingActor().setBehavior(newFollower2);
344 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
345 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
347 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
349 // Wait for leader's install snapshot and capture it
351 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
353 // Send a second AddServer - should get queued
354 JavaTestKit testKit2 = new JavaTestKit(getSystem());
355 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
357 // Continue the first AddServer
358 newFollowerRaftActorInstance.setDropMessageOfType(null);
359 newFollowerRaftActor.tell(installSnapshot, leaderActor);
361 // Verify both complete successfully
362 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
363 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
365 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
366 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
368 // Verify ServerConfigurationPayload entries in leader's log
370 expectMatching(leaderCollectorActor, ApplyState.class, 2);
371 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
372 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
373 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
374 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
375 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
377 // Verify ServerConfigurationPayload entry in the new follower
379 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
380 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
381 newFollowerActorContext.getPeerIds());
385 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
386 RaftActorContext initialActorContext = new MockRaftActorContext();
388 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
389 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
390 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
391 actorFactory.generateActorId(LEADER_ID));
393 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
394 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
396 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
398 // Drop commit message for now to delay snapshot completion
399 leaderRaftActor.setDropMessageOfType(String.class);
401 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
403 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
405 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
407 leaderRaftActor.setDropMessageOfType(null);
408 leaderActor.tell(commitMsg, leaderActor);
410 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
411 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
412 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
414 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
416 // Verify ServerConfigurationPayload entry in leader's log
418 expectFirstMatching(leaderCollectorActor, ApplyState.class);
419 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
420 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
421 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
422 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
423 votingServer(NEW_SERVER_ID));
427 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
428 RaftActorContext initialActorContext = new MockRaftActorContext();
430 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
431 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
432 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
433 actorFactory.generateActorId(LEADER_ID));
435 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
436 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
438 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
440 // Drop commit message so the snapshot doesn't complete.
441 leaderRaftActor.setDropMessageOfType(String.class);
443 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
445 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
447 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
448 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
450 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
454 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
455 RaftActorContext initialActorContext = new MockRaftActorContext();
457 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
458 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
459 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
460 actorFactory.generateActorId(LEADER_ID));
462 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
463 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
464 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
466 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
468 // Drop the commit message so the snapshot doesn't complete yet.
469 leaderRaftActor.setDropMessageOfType(String.class);
471 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
473 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
475 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
477 // Change the leader behavior to follower
478 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
480 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
481 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
482 // isCapturing assertion below.
483 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
485 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
486 leaderActor.tell(commitMsg, leaderActor);
488 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
490 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
491 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
493 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
494 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
498 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
499 RaftActorContext initialActorContext = new MockRaftActorContext();
501 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
502 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
503 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
504 actorFactory.generateActorId(LEADER_ID));
506 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
507 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
509 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
511 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
513 // Drop the UnInitializedFollowerSnapshotReply to delay it.
514 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
516 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
518 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
519 UnInitializedFollowerSnapshotReply.class);
521 // Prevent election timeout when the leader switches to follower
522 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
524 // Change the leader behavior to follower
525 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
527 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
528 leaderRaftActor.setDropMessageOfType(null);
529 leaderActor.tell(snapshotReply, leaderActor);
531 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
532 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
534 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
538 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
539 RaftActorContext initialActorContext = new MockRaftActorContext();
541 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
542 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
543 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
544 actorFactory.generateActorId(LEADER_ID));
546 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
547 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
548 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
550 // Drop the InstallSnapshot message so it times out
551 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
553 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
555 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
557 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
558 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
560 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
561 assertEquals("Leader followers size", 0,
562 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
566 public void testAddServerWithNoLeader() {
567 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
568 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
570 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
571 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
572 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
573 actorFactory.generateActorId(LEADER_ID));
574 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
576 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
577 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
578 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
582 public void testAddServerWithNoConsensusReached() {
583 RaftActorContext initialActorContext = new MockRaftActorContext();
585 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
586 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
587 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
588 actorFactory.generateActorId(LEADER_ID));
590 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
591 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
593 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
595 // Drop UnInitializedFollowerSnapshotReply initially
596 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
598 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
599 TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
600 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
602 // Drop AppendEntries to the new follower so consensus isn't reached
603 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
605 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
607 // Capture the UnInitializedFollowerSnapshotReply
608 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
610 // Send the UnInitializedFollowerSnapshotReply to resume the first request
611 leaderRaftActor.setDropMessageOfType(null);
612 leaderActor.tell(snapshotReply, leaderActor);
614 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
616 // Send a second AddServer
617 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
619 // The first AddServer should succeed with OK even though consensus wasn't reached
620 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
621 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
622 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
624 // Verify ServerConfigurationPayload entry in leader's log
625 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
626 votingServer(NEW_SERVER_ID));
628 // The second AddServer should fail since consensus wasn't reached for the first
629 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
630 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
632 // Re-send the second AddServer - should also fail
633 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
634 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
635 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
639 public void testAddServerWithExistingServer() {
640 RaftActorContext initialActorContext = new MockRaftActorContext();
642 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
643 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
644 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
645 actorFactory.generateActorId(LEADER_ID));
647 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
649 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
650 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
654 public void testAddServerForwardedToLeader() {
655 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
656 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
658 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
659 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
660 actorFactory.generateActorId(LEADER_ID));
662 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
663 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
664 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
665 actorFactory.generateActorId(FOLLOWER_ID));
666 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
668 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
669 -1, -1, (short)0), leaderActor);
671 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
672 expectFirstMatching(leaderActor, AddServer.class);
676 public void testOnApplyState() {
677 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
678 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
679 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
680 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
681 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
682 actorFactory.generateActorId(LEADER_ID));
684 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
686 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
687 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
688 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
689 assertEquals("Message handled", true, handled);
691 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
692 new MockRaftActorContext.MockPayload("1"));
693 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
694 assertEquals("Message handled", false, handled);
698 public void testRemoveServerWithNoLeader() {
699 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
700 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
702 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
703 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
704 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
705 actorFactory.generateActorId(LEADER_ID));
706 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
708 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
709 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
710 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
714 public void testRemoveServerNonExistentServer() {
715 RaftActorContext initialActorContext = new MockRaftActorContext();
717 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
718 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
719 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
720 actorFactory.generateActorId(LEADER_ID));
722 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
723 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
724 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
728 public void testRemoveServerForwardToLeader() {
729 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
730 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
732 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
733 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
734 actorFactory.generateActorId(LEADER_ID));
736 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
737 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
738 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
739 actorFactory.generateActorId(FOLLOWER_ID));
740 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
742 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
743 -1, -1, (short)0), leaderActor);
745 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
746 expectFirstMatching(leaderActor, RemoveServer.class);
750 public void testRemoveServer() {
751 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
752 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
753 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
755 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
756 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
757 RaftActorContext initialActorContext = new MockRaftActorContext();
759 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
760 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
761 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
762 actorFactory.generateActorId(LEADER_ID));
764 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
766 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
767 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
768 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
771 TestActorRef<MessageCollectorActor> collector =
772 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
774 followerRaftActor.underlyingActor().setCollectorActor(collector);
776 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
777 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
778 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
780 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
781 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
782 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
784 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
785 assertTrue("Expected Leader", currentBehavior instanceof Leader);
786 assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
788 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
792 public void testRemoveServerLeader() {
793 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
794 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
795 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
797 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
798 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
799 RaftActorContext initialActorContext = new MockRaftActorContext();
801 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
802 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
803 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
804 actorFactory.generateActorId(LEADER_ID));
806 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
808 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
809 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
810 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
813 TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
814 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
815 followerRaftActor.underlyingActor().setCollectorActor(followerCollector);
817 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
818 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
819 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
821 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
822 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
823 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
824 votingServer(FOLLOWER_ID));
826 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
830 public void testRemoveServerLeaderWithNoFollowers() {
831 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
832 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
833 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
834 actorFactory.generateActorId(LEADER_ID));
836 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
837 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
838 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
841 private static ServerInfo votingServer(String id) {
842 return new ServerInfo(id, true);
845 private static ServerInfo nonVotingServer(String id) {
846 return new ServerInfo(id, false);
849 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
850 return newCollectorActor(leaderRaftActor, LEADER_ID);
853 private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
854 TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
855 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
856 actorFactory.generateActorId(id + "Collector"));
857 raftActor.setCollectorActor(collectorActor);
858 return collectorActor;
861 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
862 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
863 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
864 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
865 assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
868 private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
869 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
870 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
871 configParams.setElectionTimeoutFactor(100000);
872 ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
873 termInfo.update(1, LEADER_ID);
874 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
875 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
878 static abstract class AbstractMockRaftActor extends MockRaftActor {
879 private volatile TestActorRef<MessageCollectorActor> collectorActor;
880 private volatile Class<?> dropMessageOfType;
882 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
883 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
884 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
885 dataPersistenceProvider(dataPersistenceProvider));
886 this.collectorActor = collectorActor;
889 void setDropMessageOfType(Class<?> dropMessageOfType) {
890 this.dropMessageOfType = dropMessageOfType;
893 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
894 this.collectorActor = collectorActor;
898 public void handleCommand(Object message) {
899 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
900 super.handleCommand(message);
903 if(collectorActor != null) {
904 collectorActor.tell(message, getSender());
909 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
911 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
912 super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
915 public static Props props(final String id, final Map<String, String> peerAddresses,
916 ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
918 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
923 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
924 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
925 RaftActorContext fromContext) {
926 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
927 setPersistence(false);
929 RaftActorContext context = getRaftActorContext();
930 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
931 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
932 getState().add(entry.getData());
933 context.getReplicatedLog().append(entry);
936 context.setCommitIndex(fromContext.getCommitIndex());
937 context.setLastApplied(fromContext.getLastApplied());
938 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
939 fromContext.getTermInformation().getVotedFor());
943 protected void initializeBehavior() {
944 changeCurrentBehavior(new Leader(getRaftActorContext()));
945 initializeBehaviorComplete.countDown();
949 public void createSnapshot(ActorRef actorRef) {
951 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
952 } catch (Exception e) {
953 LOG.error("createSnapshot failed", e);
957 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
958 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
959 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
960 configParams.setElectionTimeoutFactor(10);
961 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
965 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
966 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
967 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
968 setPersistence(false);
971 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
972 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);