2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Collections;
29 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.DataPersistenceProvider;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
40 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
41 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
42 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
43 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
44 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
45 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
46 import org.opendaylight.controller.cluster.raft.messages.AddServer;
47 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
49 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
50 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
51 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
52 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
53 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
54 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
55 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
56 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
57 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
58 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
59 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
60 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
61 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.duration.FiniteDuration;
67 * Unit tests for RaftActorServerConfigurationSupport.
69 * @author Thomas Pantelis
71 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
72 static final String LEADER_ID = "leader";
73 static final String FOLLOWER_ID = "follower";
74 static final String FOLLOWER_ID2 = "follower2";
75 static final String NEW_SERVER_ID = "new-server";
76 static final String NEW_SERVER_ID2 = "new-server2";
77 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
78 private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
80 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
82 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
83 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
84 actorFactory.generateActorId(FOLLOWER_ID));
86 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
87 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
88 private RaftActorContext newFollowerActorContext;
90 private final JavaTestKit testKit = new JavaTestKit(getSystem());
94 InMemoryJournal.clear();
95 InMemorySnapshotStore.clear();
97 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
99 newFollowerCollectorActor = actorFactory.createTestActor(
100 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
101 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
102 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
103 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
104 actorFactory.generateActorId(NEW_SERVER_ID));
107 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
108 } catch (Exception e) {
109 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
113 private static DefaultConfigParamsImpl newFollowerConfigParams() {
114 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
115 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
116 configParams.setElectionTimeoutFactor(100000);
117 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
122 public void tearDown() throws Exception {
123 actorFactory.close();
127 public void testAddServerWithExistingFollower() throws Exception {
128 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
129 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
131 followerActorContext.setCommitIndex(2);
132 followerActorContext.setLastApplied(2);
134 Follower follower = new Follower(followerActorContext);
135 followerActor.underlyingActor().setBehavior(follower);
137 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
138 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
139 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
140 actorFactory.generateActorId(LEADER_ID));
142 // Expect initial heartbeat from the leader.
143 expectFirstMatching(followerActor, AppendEntries.class);
144 clearMessages(followerActor);
146 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
147 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
149 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
151 // Leader should install snapshot - capture and verify ApplySnapshot contents
153 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
154 @SuppressWarnings("unchecked")
155 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
156 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
158 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
159 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
160 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
162 // Verify ServerConfigurationPayload entry in leader's log
164 expectFirstMatching(leaderCollectorActor, ApplyState.class);
165 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
166 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
167 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
168 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
169 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
170 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
172 // Verify ServerConfigurationPayload entry in both followers
174 expectFirstMatching(followerActor, ApplyState.class);
175 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
176 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
177 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
179 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
180 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
181 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
182 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
184 // Verify new server config was applied in both followers
186 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
188 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
190 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
191 expectFirstMatching(followerActor, ApplyState.class);
193 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
194 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
195 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
196 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
198 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
199 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
200 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
201 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
202 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
203 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
205 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
206 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
207 logEntry = persistedLogEntries.get(0);
208 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
209 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
210 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
211 logEntry.getData().getClass());
215 public void testAddServerWithNoExistingFollower() throws Exception {
216 RaftActorContext initialActorContext = new MockRaftActorContext();
217 initialActorContext.setCommitIndex(1);
218 initialActorContext.setLastApplied(1);
219 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
222 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
223 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
224 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
225 actorFactory.generateActorId(LEADER_ID));
227 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
228 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
230 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
232 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
234 // Leader should install snapshot - capture and verify ApplySnapshot contents
236 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
237 @SuppressWarnings("unchecked")
238 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
239 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
241 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
242 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
243 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
245 // Verify ServerConfigurationPayload entry in leader's log
247 expectFirstMatching(leaderCollectorActor, ApplyState.class);
248 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
249 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
250 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
251 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
252 votingServer(NEW_SERVER_ID));
254 // Verify ServerConfigurationPayload entry in the new follower
256 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
257 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
258 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
259 votingServer(NEW_SERVER_ID));
261 // Verify new server config was applied in the new follower
263 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
267 public void testAddServersAsNonVoting() throws Exception {
268 RaftActorContext initialActorContext = new MockRaftActorContext();
270 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
271 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
272 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
273 actorFactory.generateActorId(LEADER_ID));
275 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
276 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
278 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
280 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
282 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
283 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
284 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
286 // Verify ServerConfigurationPayload entry in leader's log
288 expectFirstMatching(leaderCollectorActor, ApplyState.class);
290 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
291 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
292 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
293 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
294 nonVotingServer(NEW_SERVER_ID));
296 // Verify ServerConfigurationPayload entry in the new follower
298 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
299 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
300 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
301 nonVotingServer(NEW_SERVER_ID));
303 // Verify new server config was applied in the new follower
305 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
307 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
309 // Add another non-voting server.
311 clearMessages(leaderCollectorActor);
313 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
314 Follower newFollower2 = new Follower(follower2ActorContext);
315 followerActor.underlyingActor().setBehavior(newFollower2);
317 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
319 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
320 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
321 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
323 expectFirstMatching(leaderCollectorActor, ApplyState.class);
324 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
325 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
326 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
327 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
328 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
332 public void testAddServerWithOperationInProgress() throws Exception {
333 RaftActorContext initialActorContext = new MockRaftActorContext();
335 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
336 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
337 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
338 actorFactory.generateActorId(LEADER_ID));
340 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
341 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
343 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
345 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
346 Follower newFollower2 = new Follower(follower2ActorContext);
347 followerActor.underlyingActor().setBehavior(newFollower2);
349 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
350 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
352 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
354 // Wait for leader's install snapshot and capture it
356 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
358 // Send a second AddServer - should get queued
359 JavaTestKit testKit2 = new JavaTestKit(getSystem());
360 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
362 // Continue the first AddServer
363 newFollowerRaftActorInstance.setDropMessageOfType(null);
364 newFollowerRaftActor.tell(installSnapshot, leaderActor);
366 // Verify both complete successfully
367 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
368 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
370 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
371 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
373 // Verify ServerConfigurationPayload entries in leader's log
375 expectMatching(leaderCollectorActor, ApplyState.class, 2);
376 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
377 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
378 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
379 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
380 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
382 // Verify ServerConfigurationPayload entry in the new follower
384 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
385 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
386 newFollowerActorContext.getPeerIds());
390 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
391 RaftActorContext initialActorContext = new MockRaftActorContext();
393 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
394 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
395 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
396 actorFactory.generateActorId(LEADER_ID));
398 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
399 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
401 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
403 // Drop commit message for now to delay snapshot completion
404 leaderRaftActor.setDropMessageOfType(String.class);
406 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
408 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
410 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
412 leaderRaftActor.setDropMessageOfType(null);
413 leaderActor.tell(commitMsg, leaderActor);
415 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
416 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
417 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
419 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
421 // Verify ServerConfigurationPayload entry in leader's log
423 expectFirstMatching(leaderCollectorActor, ApplyState.class);
424 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
425 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
426 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
427 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
428 votingServer(NEW_SERVER_ID));
432 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
433 RaftActorContext initialActorContext = new MockRaftActorContext();
435 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
436 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
437 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
438 actorFactory.generateActorId(LEADER_ID));
440 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
441 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
443 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
445 // Drop commit message so the snapshot doesn't complete.
446 leaderRaftActor.setDropMessageOfType(String.class);
448 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
450 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
452 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
453 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
455 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
459 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
460 RaftActorContext initialActorContext = new MockRaftActorContext();
462 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
463 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
464 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
465 actorFactory.generateActorId(LEADER_ID));
467 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
468 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
469 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
471 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
473 // Drop the commit message so the snapshot doesn't complete yet.
474 leaderRaftActor.setDropMessageOfType(String.class);
476 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
478 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
480 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
482 // Change the leader behavior to follower
483 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
485 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
486 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
487 // isCapturing assertion below.
488 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
490 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
491 leaderActor.tell(commitMsg, leaderActor);
493 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
495 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
496 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
498 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
499 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
503 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
504 RaftActorContext initialActorContext = new MockRaftActorContext();
506 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
507 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
508 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
509 actorFactory.generateActorId(LEADER_ID));
511 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
512 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
514 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
516 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
518 // Drop the UnInitializedFollowerSnapshotReply to delay it.
519 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
521 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
523 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
524 UnInitializedFollowerSnapshotReply.class);
526 // Prevent election timeout when the leader switches to follower
527 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
529 // Change the leader behavior to follower
530 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
532 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
533 leaderRaftActor.setDropMessageOfType(null);
534 leaderActor.tell(snapshotReply, leaderActor);
536 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
537 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
539 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
543 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
544 RaftActorContext initialActorContext = new MockRaftActorContext();
546 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
547 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
548 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
549 actorFactory.generateActorId(LEADER_ID));
551 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
552 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
553 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
555 // Drop the InstallSnapshot message so it times out
556 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
558 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
560 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
562 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
563 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
565 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
566 assertEquals("Leader followers size", 0,
567 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
571 public void testAddServerWithNoLeader() {
572 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
573 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
575 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
576 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
577 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
578 actorFactory.generateActorId(LEADER_ID));
579 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
581 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
582 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
583 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
587 public void testAddServerWithNoConsensusReached() {
588 RaftActorContext initialActorContext = new MockRaftActorContext();
590 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
591 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
592 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
593 actorFactory.generateActorId(LEADER_ID));
595 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
596 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
598 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
600 // Drop UnInitializedFollowerSnapshotReply initially
601 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
603 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
604 TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
605 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
607 // Drop AppendEntries to the new follower so consensus isn't reached
608 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
610 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
612 // Capture the UnInitializedFollowerSnapshotReply
613 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
615 // Send the UnInitializedFollowerSnapshotReply to resume the first request
616 leaderRaftActor.setDropMessageOfType(null);
617 leaderActor.tell(snapshotReply, leaderActor);
619 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
621 // Send a second AddServer
622 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
624 // The first AddServer should succeed with OK even though consensus wasn't reached
625 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
626 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
627 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
629 // Verify ServerConfigurationPayload entry in leader's log
630 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
631 votingServer(NEW_SERVER_ID));
633 // The second AddServer should fail since consensus wasn't reached for the first
634 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
635 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
637 // Re-send the second AddServer - should also fail
638 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
639 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
640 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
644 public void testAddServerWithExistingServer() {
645 RaftActorContext initialActorContext = new MockRaftActorContext();
647 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
648 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
649 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
650 actorFactory.generateActorId(LEADER_ID));
652 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
654 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
655 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
659 public void testAddServerForwardedToLeader() {
660 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
661 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
663 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
664 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
665 actorFactory.generateActorId(LEADER_ID));
667 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
668 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
669 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
670 actorFactory.generateActorId(FOLLOWER_ID));
671 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
673 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
674 -1, -1, (short)0), leaderActor);
676 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
677 expectFirstMatching(leaderActor, AddServer.class);
681 public void testOnApplyState() {
682 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
683 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
684 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
685 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
686 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
687 actorFactory.generateActorId(LEADER_ID));
689 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
691 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
692 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
693 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
694 assertEquals("Message handled", true, handled);
696 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
697 new MockRaftActorContext.MockPayload("1"));
698 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
699 assertEquals("Message handled", false, handled);
703 public void testRemoveServerWithNoLeader() {
704 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
705 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
707 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
708 MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
709 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
710 actorFactory.generateActorId(LEADER_ID));
711 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
713 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
714 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
715 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
719 public void testRemoveServerNonExistentServer() {
720 RaftActorContext initialActorContext = new MockRaftActorContext();
722 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
723 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
724 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
725 actorFactory.generateActorId(LEADER_ID));
727 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
728 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
729 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
733 public void testRemoveServerForwardToLeader() {
734 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
735 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
737 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
738 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
739 actorFactory.generateActorId(LEADER_ID));
741 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
742 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
743 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
744 actorFactory.generateActorId(FOLLOWER_ID));
745 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
747 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
748 -1, -1, (short)0), leaderActor);
750 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
751 expectFirstMatching(leaderActor, RemoveServer.class);
755 public void testRemoveServer() {
756 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
757 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
758 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
760 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
761 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
762 RaftActorContext initialActorContext = new MockRaftActorContext();
764 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
765 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
766 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
767 actorFactory.generateActorId(LEADER_ID));
769 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
771 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
772 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
773 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
776 TestActorRef<MessageCollectorActor> collector =
777 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
779 followerRaftActor.underlyingActor().setCollectorActor(collector);
781 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
782 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
783 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
785 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
786 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
787 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
789 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
790 assertTrue("Expected Leader", currentBehavior instanceof Leader);
791 assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
793 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
797 public void testRemoveServerLeader() {
798 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
799 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
800 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
802 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
803 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
804 RaftActorContext initialActorContext = new MockRaftActorContext();
806 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
807 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
808 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
809 actorFactory.generateActorId(LEADER_ID));
811 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
813 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
814 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
815 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
818 TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
819 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
820 followerRaftActor.underlyingActor().setCollectorActor(followerCollector);
822 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
823 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
824 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
826 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
827 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
828 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
829 votingServer(FOLLOWER_ID));
831 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
835 public void testRemoveServerLeaderWithNoFollowers() {
836 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
837 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
838 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
839 actorFactory.generateActorId(LEADER_ID));
841 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
842 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
843 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
847 public void testChangeServersVotingStatus() {
848 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
849 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
850 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
852 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
853 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
854 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
855 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
857 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
858 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
859 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
860 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
861 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
863 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
864 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
865 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE).
866 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
867 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
868 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
869 actorFactory.generateActorId("collector"));
870 follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector);
872 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
873 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
874 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE).
875 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
876 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
877 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
878 actorFactory.generateActorId("collector"));
879 follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector);
881 // Send first ChangeServersVotingStatus message
883 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
885 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
886 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
888 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
889 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
890 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
891 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
893 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
894 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
895 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
897 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
898 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
899 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
901 MessageCollectorActor.clearMessages(leaderCollector);
902 MessageCollectorActor.clearMessages(follower1Collector);
903 MessageCollectorActor.clearMessages(follower2Collector);
905 // Send second ChangeServersVotingStatus message
907 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
908 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
909 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
911 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
912 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
913 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
915 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
916 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
917 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
919 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
920 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
921 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
925 public void testChangeLeaderToNonVoting() {
926 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
927 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
929 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
930 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
931 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
932 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
934 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
935 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
936 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
937 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
938 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
940 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
941 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
942 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE).
943 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
944 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
945 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
946 actorFactory.generateActorId("collector"));
947 follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector);
949 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
950 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
951 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE).
952 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
953 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
954 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
955 actorFactory.generateActorId("collector"));
956 follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector);
958 // Send ChangeServersVotingStatus message
960 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
961 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
962 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
964 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
965 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
966 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
968 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
969 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
970 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
972 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
973 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
974 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
976 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
977 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
979 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
982 private void verifyRaftState(RaftState expState, RaftActor... raftActors) {
983 Stopwatch sw = Stopwatch.createStarted();
984 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
985 for(RaftActor raftActor: raftActors) {
986 if(raftActor.getRaftState() == expState) {
992 fail("None of the RaftActors have state " + expState);
995 private static ServerInfo votingServer(String id) {
996 return new ServerInfo(id, true);
999 private static ServerInfo nonVotingServer(String id) {
1000 return new ServerInfo(id, false);
1003 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1004 return newCollectorActor(leaderRaftActor, LEADER_ID);
1007 private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1008 TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1009 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1010 actorFactory.generateActorId(id + "Collector"));
1011 raftActor.setCollectorActor(collectorActor);
1012 return collectorActor;
1015 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1016 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1017 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1018 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1019 assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1022 private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1023 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1024 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1025 configParams.setElectionTimeoutFactor(100000);
1026 ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
1027 termInfo.update(1, LEADER_ID);
1028 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1029 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
1032 static abstract class AbstractMockRaftActor extends MockRaftActor {
1033 private volatile TestActorRef<MessageCollectorActor> collectorActor;
1034 private volatile Class<?> dropMessageOfType;
1036 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1037 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
1038 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1039 dataPersistenceProvider(dataPersistenceProvider));
1040 this.collectorActor = collectorActor;
1043 void setDropMessageOfType(Class<?> dropMessageOfType) {
1044 this.dropMessageOfType = dropMessageOfType;
1047 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1048 this.collectorActor = collectorActor;
1052 public void handleCommand(Object message) {
1053 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1054 super.handleCommand(message);
1057 if(collectorActor != null) {
1058 collectorActor.tell(message, getSender());
1063 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1065 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
1066 super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
1069 public static Props props(final String id, final Map<String, String> peerAddresses,
1070 ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
1072 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
1077 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1078 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1079 RaftActorContext fromContext) {
1080 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1081 setPersistence(false);
1083 RaftActorContext context = getRaftActorContext();
1084 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1085 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1086 getState().add(entry.getData());
1087 context.getReplicatedLog().append(entry);
1090 context.setCommitIndex(fromContext.getCommitIndex());
1091 context.setLastApplied(fromContext.getLastApplied());
1092 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1093 fromContext.getTermInformation().getVotedFor());
1097 protected void initializeBehavior() {
1098 changeCurrentBehavior(new Leader(getRaftActorContext()));
1099 initializeBehaviorComplete.countDown();
1103 public void createSnapshot(ActorRef actorRef) {
1105 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1106 } catch (Exception e) {
1107 LOG.error("createSnapshot failed", e);
1111 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1112 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1113 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1114 configParams.setElectionTimeoutFactor(10);
1115 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1119 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1120 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1121 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
1122 setPersistence(false);
1125 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1126 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);