2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
39 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
40 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
41 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
42 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
43 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
44 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
45 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
46 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
47 import org.opendaylight.controller.cluster.raft.messages.AddServer;
48 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
50 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
52 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
53 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
54 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
55 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
56 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
57 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
58 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
60 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
61 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
62 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
63 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
66 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
67 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.concurrent.duration.FiniteDuration;
73 * Unit tests for RaftActorServerConfigurationSupport.
75 * @author Thomas Pantelis
77 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
78 static final String LEADER_ID = "leader";
79 static final String FOLLOWER_ID = "follower";
80 static final String FOLLOWER_ID2 = "follower2";
81 static final String NEW_SERVER_ID = "new-server";
82 static final String NEW_SERVER_ID2 = "new-server2";
83 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
84 private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
85 private static final boolean NO_PERSISTENCE = false;
86 private static final boolean PERSISTENT = true;
88 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
90 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
92 actorFactory.generateActorId(FOLLOWER_ID));
94 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
95 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
96 private RaftActorContext newFollowerActorContext;
98 private final JavaTestKit testKit = new JavaTestKit(getSystem());
101 public void setup() {
102 InMemoryJournal.clear();
103 InMemorySnapshotStore.clear();
106 private void setupNewFollower() {
107 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
109 newFollowerCollectorActor = actorFactory.createTestActor(
110 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
111 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
112 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
113 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
114 actorFactory.generateActorId(NEW_SERVER_ID));
117 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
118 } catch (Exception e) {
119 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
123 private static DefaultConfigParamsImpl newFollowerConfigParams() {
124 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
125 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
126 configParams.setElectionTimeoutFactor(100000);
127 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
132 public void tearDown() throws Exception {
133 actorFactory.close();
137 public void testAddServerWithExistingFollower() throws Exception {
138 LOG.info("testAddServerWithExistingFollower starting");
140 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
141 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
143 followerActorContext.setCommitIndex(2);
144 followerActorContext.setLastApplied(2);
146 Follower follower = new Follower(followerActorContext);
147 followerActor.underlyingActor().setBehavior(follower);
148 followerActorContext.setCurrentBehavior(follower);
150 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
151 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
152 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
153 actorFactory.generateActorId(LEADER_ID));
155 // Expect initial heartbeat from the leader.
156 expectFirstMatching(followerActor, AppendEntries.class);
157 clearMessages(followerActor);
159 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
160 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
162 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
164 // Leader should install snapshot - capture and verify ApplySnapshot contents
166 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
167 @SuppressWarnings("unchecked")
168 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
169 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
171 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
172 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
173 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
175 // Verify ServerConfigurationPayload entry in leader's log
177 expectFirstMatching(leaderCollectorActor, ApplyState.class);
178 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
179 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
180 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
181 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
182 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
183 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
185 // Verify ServerConfigurationPayload entry in both followers
187 expectFirstMatching(followerActor, ApplyState.class);
188 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
189 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
190 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
192 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
193 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
194 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
195 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
197 // Verify new server config was applied in both followers
199 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
201 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
203 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
204 expectFirstMatching(followerActor, ApplyState.class);
206 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
207 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
208 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
209 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
211 assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
212 InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class).size());
213 assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
214 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
216 assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
217 InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class).size());
218 assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
219 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
221 LOG.info("testAddServerWithExistingFollower ending");
225 public void testAddServerWithNoExistingFollower() throws Exception {
226 LOG.info("testAddServerWithNoExistingFollower starting");
229 RaftActorContext initialActorContext = new MockRaftActorContext();
230 initialActorContext.setCommitIndex(1);
231 initialActorContext.setLastApplied(1);
232 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
235 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
236 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
237 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
238 actorFactory.generateActorId(LEADER_ID));
240 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
241 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
243 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
245 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
247 // Leader should install snapshot - capture and verify ApplySnapshot contents
249 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
250 @SuppressWarnings("unchecked")
251 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
252 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
254 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
255 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
256 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
258 // Verify ServerConfigurationPayload entry in leader's log
260 expectFirstMatching(leaderCollectorActor, ApplyState.class);
261 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
262 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
263 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
264 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
265 votingServer(NEW_SERVER_ID));
267 // Verify ServerConfigurationPayload entry in the new follower
269 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
270 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
271 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
272 votingServer(NEW_SERVER_ID));
274 // Verify new server config was applied in the new follower
276 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
278 LOG.info("testAddServerWithNoExistingFollower ending");
282 public void testAddServersAsNonVoting() throws Exception {
283 LOG.info("testAddServersAsNonVoting starting");
286 RaftActorContext initialActorContext = new MockRaftActorContext();
288 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
289 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
290 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
291 actorFactory.generateActorId(LEADER_ID));
293 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
294 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
296 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
298 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
300 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
301 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
302 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
304 // Verify ServerConfigurationPayload entry in leader's log
306 expectFirstMatching(leaderCollectorActor, ApplyState.class);
308 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
309 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
310 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
311 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
312 nonVotingServer(NEW_SERVER_ID));
314 // Verify ServerConfigurationPayload entry in the new follower
316 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
317 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
318 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
319 nonVotingServer(NEW_SERVER_ID));
321 // Verify new server config was applied in the new follower
323 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
325 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
327 // Add another non-voting server.
329 clearMessages(leaderCollectorActor);
331 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
332 Follower newFollower2 = new Follower(follower2ActorContext);
333 followerActor.underlyingActor().setBehavior(newFollower2);
335 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
337 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
338 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
339 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
341 expectFirstMatching(leaderCollectorActor, ApplyState.class);
342 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
343 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
344 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
345 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
346 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
348 LOG.info("testAddServersAsNonVoting ending");
352 public void testAddServerWithOperationInProgress() throws Exception {
353 LOG.info("testAddServerWithOperationInProgress starting");
356 RaftActorContext initialActorContext = new MockRaftActorContext();
358 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
359 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
360 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
361 actorFactory.generateActorId(LEADER_ID));
363 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
364 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
366 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
368 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
369 Follower newFollower2 = new Follower(follower2ActorContext);
370 followerActor.underlyingActor().setBehavior(newFollower2);
372 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
373 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
375 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
377 // Wait for leader's install snapshot and capture it
379 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
381 // Send a second AddServer - should get queued
382 JavaTestKit testKit2 = new JavaTestKit(getSystem());
383 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
385 // Continue the first AddServer
386 newFollowerRaftActorInstance.setDropMessageOfType(null);
387 newFollowerRaftActor.tell(installSnapshot, leaderActor);
389 // Verify both complete successfully
390 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
391 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
393 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
394 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
396 // Verify ServerConfigurationPayload entries in leader's log
398 expectMatching(leaderCollectorActor, ApplyState.class, 2);
399 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
400 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
401 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
402 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
403 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
405 // Verify ServerConfigurationPayload entry in the new follower
407 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
408 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
409 newFollowerActorContext.getPeerIds());
411 LOG.info("testAddServerWithOperationInProgress ending");
415 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
416 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
419 RaftActorContext initialActorContext = new MockRaftActorContext();
421 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
422 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
423 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
424 actorFactory.generateActorId(LEADER_ID));
426 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
427 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
429 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
431 // Drop commit message for now to delay snapshot completion
432 leaderRaftActor.setDropMessageOfType(String.class);
434 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
436 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
438 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
440 leaderRaftActor.setDropMessageOfType(null);
441 leaderActor.tell(commitMsg, leaderActor);
443 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
444 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
445 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
447 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
449 // Verify ServerConfigurationPayload entry in leader's log
451 expectFirstMatching(leaderCollectorActor, ApplyState.class);
452 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
453 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
454 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
455 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
456 votingServer(NEW_SERVER_ID));
458 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
462 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
463 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
466 RaftActorContext initialActorContext = new MockRaftActorContext();
468 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
469 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
470 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
471 actorFactory.generateActorId(LEADER_ID));
473 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
474 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
476 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
478 // Drop commit message so the snapshot doesn't complete.
479 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
481 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
483 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
485 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
486 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
488 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
490 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
494 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
495 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
498 RaftActorContext initialActorContext = new MockRaftActorContext();
500 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
501 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
502 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
503 actorFactory.generateActorId(LEADER_ID));
505 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
506 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
507 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
509 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
511 // Drop the commit message so the snapshot doesn't complete yet.
512 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
514 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
516 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
518 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
520 // Change the leader behavior to follower
521 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
523 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
524 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
525 // isCapturing assertion below.
526 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
528 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
529 leaderActor.tell(commitMsg, leaderActor);
531 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
533 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
534 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
536 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
537 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
539 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
543 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
544 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
547 RaftActorContext initialActorContext = new MockRaftActorContext();
549 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
550 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
551 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
552 actorFactory.generateActorId(LEADER_ID));
554 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
555 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
557 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
559 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
561 // Drop the UnInitializedFollowerSnapshotReply to delay it.
562 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
564 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
566 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
567 UnInitializedFollowerSnapshotReply.class);
569 // Prevent election timeout when the leader switches to follower
570 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
572 // Change the leader behavior to follower
573 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
575 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
576 leaderRaftActor.setDropMessageOfType(null);
577 leaderActor.tell(snapshotReply, leaderActor);
579 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
580 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
582 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
584 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
588 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
589 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
592 RaftActorContext initialActorContext = new MockRaftActorContext();
594 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
595 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
596 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
597 actorFactory.generateActorId(LEADER_ID));
599 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
600 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
601 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
603 // Drop the InstallSnapshot message so it times out
604 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
606 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
608 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
610 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
611 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
613 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
614 assertEquals("Leader followers size", 0,
615 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
617 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
621 public void testAddServerWithNoLeader() {
622 LOG.info("testAddServerWithNoLeader starting");
625 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
626 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
628 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
629 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
630 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
631 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
632 actorFactory.generateActorId(LEADER_ID));
633 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
635 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
636 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
637 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
639 LOG.info("testAddServerWithNoLeader ending");
643 public void testAddServerWithNoConsensusReached() {
644 LOG.info("testAddServerWithNoConsensusReached starting");
647 RaftActorContext initialActorContext = new MockRaftActorContext();
649 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
650 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
651 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
652 actorFactory.generateActorId(LEADER_ID));
654 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
655 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
657 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
659 // Drop UnInitializedFollowerSnapshotReply initially
660 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
662 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
663 TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
664 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
666 // Drop AppendEntries to the new follower so consensus isn't reached
667 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
669 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
671 // Capture the UnInitializedFollowerSnapshotReply
672 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
674 // Send the UnInitializedFollowerSnapshotReply to resume the first request
675 leaderRaftActor.setDropMessageOfType(null);
676 leaderActor.tell(snapshotReply, leaderActor);
678 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
680 // Send a second AddServer
681 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
683 // The first AddServer should succeed with OK even though consensus wasn't reached
684 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
685 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
686 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
688 // Verify ServerConfigurationPayload entry in leader's log
689 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
690 votingServer(NEW_SERVER_ID));
692 // The second AddServer should fail since consensus wasn't reached for the first
693 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
694 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
696 // Re-send the second AddServer - should also fail
697 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
698 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
699 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
701 LOG.info("testAddServerWithNoConsensusReached ending");
705 public void testAddServerWithExistingServer() {
706 LOG.info("testAddServerWithExistingServer starting");
708 RaftActorContext initialActorContext = new MockRaftActorContext();
710 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
711 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
712 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
713 actorFactory.generateActorId(LEADER_ID));
715 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
717 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
718 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
720 LOG.info("testAddServerWithExistingServer ending");
724 public void testAddServerForwardedToLeader() {
725 LOG.info("testAddServerForwardedToLeader starting");
728 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
729 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
731 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
732 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
733 actorFactory.generateActorId(LEADER_ID));
735 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
736 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
737 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
738 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
739 actorFactory.generateActorId(FOLLOWER_ID));
740 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
742 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
743 -1, -1, (short)0), leaderActor);
745 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
746 expectFirstMatching(leaderActor, AddServer.class);
748 LOG.info("testAddServerForwardedToLeader ending");
752 public void testOnApplyState() {
753 LOG.info("testOnApplyState starting");
755 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
756 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
757 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
758 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
759 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
760 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
761 actorFactory.generateActorId(LEADER_ID));
763 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
765 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
766 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
767 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
768 assertEquals("Message handled", true, handled);
770 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
771 new MockRaftActorContext.MockPayload("1"));
772 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
773 assertEquals("Message handled", false, handled);
775 LOG.info("testOnApplyState ending");
779 public void testRemoveServerWithNoLeader() {
780 LOG.info("testRemoveServerWithNoLeader starting");
782 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
783 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
785 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
786 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
787 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
788 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
789 actorFactory.generateActorId(LEADER_ID));
790 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
792 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
793 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
794 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
796 LOG.info("testRemoveServerWithNoLeader ending");
800 public void testRemoveServerNonExistentServer() {
801 LOG.info("testRemoveServerNonExistentServer starting");
803 RaftActorContext initialActorContext = new MockRaftActorContext();
805 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
806 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
807 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
808 actorFactory.generateActorId(LEADER_ID));
810 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
811 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
812 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
814 LOG.info("testRemoveServerNonExistentServer ending");
818 public void testRemoveServerForwardToLeader() {
819 LOG.info("testRemoveServerForwardToLeader starting");
821 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
822 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
824 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
825 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
826 actorFactory.generateActorId(LEADER_ID));
828 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
829 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
830 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
831 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
832 actorFactory.generateActorId(FOLLOWER_ID));
833 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
835 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
836 -1, -1, (short)0), leaderActor);
838 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
839 expectFirstMatching(leaderActor, RemoveServer.class);
841 LOG.info("testRemoveServerForwardToLeader ending");
845 public void testRemoveServer() {
846 LOG.info("testRemoveServer starting");
848 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
849 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
850 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
852 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
853 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
854 RaftActorContext initialActorContext = new MockRaftActorContext();
856 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
857 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
858 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
859 actorFactory.generateActorId(LEADER_ID));
861 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
863 TestActorRef<MessageCollectorActor> collector =
864 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
865 actorFactory.generateActorId("collector"));
866 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
867 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
868 configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
871 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
872 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
873 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
875 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
876 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
877 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
879 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
880 assertTrue("Expected Leader", currentBehavior instanceof Leader);
881 assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
883 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
885 LOG.info("testRemoveServer ending");
889 public void testRemoveServerLeader() {
890 LOG.info("testRemoveServerLeader starting");
892 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
893 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
894 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
896 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
897 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
898 RaftActorContext initialActorContext = new MockRaftActorContext();
900 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
901 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
902 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
903 actorFactory.generateActorId(LEADER_ID));
905 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
907 TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
908 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
909 actorFactory.createTestActor(
910 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
911 configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
914 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
915 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
916 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
918 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
919 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
920 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
921 votingServer(FOLLOWER_ID));
923 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
925 LOG.info("testRemoveServerLeader ending");
929 public void testRemoveServerLeaderWithNoFollowers() {
930 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
932 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
933 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
934 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
935 actorFactory.generateActorId(LEADER_ID));
937 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
938 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
939 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
941 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
945 public void testChangeServersVotingStatus() {
946 LOG.info("testChangeServersVotingStatus starting");
948 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
949 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
950 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
952 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
953 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
954 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
955 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
957 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
958 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
959 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
960 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
961 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
963 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
964 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
965 actorFactory.generateActorId("collector"));
966 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
967 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
968 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
969 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
971 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
972 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
973 actorFactory.generateActorId("collector"));
974 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
975 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
976 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
977 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
979 // Send first ChangeServersVotingStatus message
981 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
983 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
984 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
986 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
987 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
988 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
989 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
991 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
992 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
993 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
995 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
996 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
997 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
999 MessageCollectorActor.clearMessages(leaderCollector);
1000 MessageCollectorActor.clearMessages(follower1Collector);
1001 MessageCollectorActor.clearMessages(follower2Collector);
1003 // Send second ChangeServersVotingStatus message
1005 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1006 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1007 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1009 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1010 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1011 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1013 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1014 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1015 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1017 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1018 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1019 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1021 LOG.info("testChangeServersVotingStatus ending");
1025 public void testChangeLeaderToNonVoting() {
1026 LOG.info("testChangeLeaderToNonVoting starting");
1028 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1029 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1031 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1032 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1033 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1034 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1036 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1037 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1038 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1039 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1040 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1042 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1043 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1044 actorFactory.generateActorId("collector"));
1045 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1046 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1047 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1048 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1050 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1051 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1052 actorFactory.generateActorId("collector"));
1053 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1054 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1055 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1056 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1058 // Send ChangeServersVotingStatus message
1060 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1061 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1062 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1064 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1065 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1066 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1068 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1069 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1070 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1072 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1073 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1074 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1076 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1077 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1079 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1081 LOG.info("testChangeLeaderToNonVoting ending");
1085 public void testChangeLeaderToNonVotingInSingleNode() {
1086 LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1088 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1089 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
1090 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1092 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1093 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1094 assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1096 LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1100 public void testChangeToVotingWithNoLeader() {
1101 LOG.info("testChangeToVotingWithNoLeader starting");
1103 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1104 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1105 configParams.setElectionTimeoutFactor(5);
1107 final String node1ID = "node1";
1108 final String node2ID = "node2";
1110 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1111 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1114 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1115 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1116 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1117 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1119 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1120 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1121 InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1122 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1123 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1124 InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1126 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1127 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1128 actorFactory.generateActorId("collector"));
1129 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1130 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1131 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1132 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1134 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1135 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1136 actorFactory.generateActorId("collector"));
1137 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1138 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1139 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1140 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1142 node1RaftActor.waitForInitializeBehaviorComplete();
1143 node2RaftActor.waitForInitializeBehaviorComplete();
1145 // Verify the intended server config was loaded and applied.
1146 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1147 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1148 votingServer("downNode2"));
1149 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1150 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1151 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1153 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1154 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1155 votingServer("downNode2"));
1156 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1158 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1159 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1160 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1161 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1163 // First send the message such that node1 has no peer address for node2 - should fail.
1165 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1166 node2ID, true, "downNode1", false, "downNode2", false));
1167 node1RaftActorRef.tell(changeServers, testKit.getRef());
1168 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1169 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1171 // Send an AppendEntries so node1 has a leaderId
1173 MessageCollectorActor.clearMessages(node1Collector);
1175 long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1176 node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1177 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1179 // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
1180 // ChangeServersVotingStatus message, it will try to elect a leader.
1182 MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
1184 // Update node2's peer address and send the message again
1186 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1188 node1RaftActorRef.tell(changeServers, testKit.getRef());
1189 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1190 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1192 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1193 assertEquals("getToIndex", 1, apply.getToIndex());
1194 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1195 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1196 nonVotingServer("downNode2"));
1197 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1198 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1200 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1201 assertEquals("getToIndex", 1, apply.getToIndex());
1202 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1203 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1204 nonVotingServer("downNode2"));
1205 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1206 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1208 LOG.info("testChangeToVotingWithNoLeader ending");
1212 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1213 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1215 final String node1ID = "node1";
1216 final String node2ID = "node2";
1218 PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1219 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1221 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1222 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1223 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1225 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1226 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1227 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1228 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1230 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1231 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1232 configParams1.setElectionTimeoutFactor(1);
1233 configParams1.setPeerAddressResolver(peerAddressResolver);
1234 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1235 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1236 actorFactory.generateActorId("collector"));
1237 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1238 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1239 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1240 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1242 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1243 configParams2.setElectionTimeoutFactor(1000000);
1244 configParams2.setPeerAddressResolver(peerAddressResolver);
1245 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1246 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1247 actorFactory.generateActorId("collector"));
1248 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1249 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1250 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1251 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1253 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1254 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1255 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1256 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1257 // node2 was previously voting.
1259 node2RaftActor.setDropMessageOfType(RequestVote.class);
1261 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1262 node1RaftActorRef.tell(changeServers, testKit.getRef());
1263 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1264 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1266 assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1267 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1268 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1270 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1274 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1275 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1277 final String node1ID = "node1";
1278 final String node2ID = "node2";
1280 PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1281 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1283 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1284 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1285 configParams.setElectionTimeoutFactor(3);
1286 configParams.setPeerAddressResolver(peerAddressResolver);
1288 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1289 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1290 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1292 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1293 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1294 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1295 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1296 InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1297 new MockRaftActorContext.MockPayload("2")));
1298 InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1300 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1301 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1302 actorFactory.generateActorId("collector"));
1303 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1304 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1305 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1306 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1308 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1309 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1310 actorFactory.generateActorId("collector"));
1311 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1312 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1313 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1314 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1316 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1317 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1318 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1319 // forward the request to node2.
1321 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1322 ImmutableMap.of(node1ID, true, node2ID, true));
1323 node1RaftActorRef.tell(changeServers, testKit.getRef());
1324 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1325 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1327 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1328 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1329 votingServer(node1ID), votingServer(node2ID));
1330 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1332 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1333 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1334 votingServer(node1ID), votingServer(node2ID));
1335 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1336 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1338 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1342 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1343 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1345 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1346 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1347 configParams.setElectionTimeoutFactor(100000);
1349 final String node1ID = "node1";
1350 final String node2ID = "node2";
1352 configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1353 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
1355 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1356 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1357 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1359 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1360 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1361 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1362 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1364 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1365 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1366 actorFactory.generateActorId("collector"));
1367 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1368 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1369 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1370 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1372 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1373 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1374 actorFactory.generateActorId("collector"));
1375 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1376 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1377 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1378 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1380 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1381 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1382 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1383 // request to node2 when node2 is elected.
1385 node2RaftActor.setDropMessageOfType(RequestVote.class);
1387 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1389 node1RaftActorRef.tell(changeServers, testKit.getRef());
1391 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1393 node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1395 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1396 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1398 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1399 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1400 votingServer(node1ID), votingServer(node2ID));
1401 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1402 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1404 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1405 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1406 votingServer(node1ID), votingServer(node2ID));
1407 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1409 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1412 private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1413 Stopwatch sw = Stopwatch.createStarted();
1414 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1415 for(RaftActor raftActor: raftActors) {
1416 if(raftActor.getRaftState() == expState) {
1422 fail("None of the RaftActors have state " + expState);
1425 private static ServerInfo votingServer(String id) {
1426 return new ServerInfo(id, true);
1429 private static ServerInfo nonVotingServer(String id) {
1430 return new ServerInfo(id, false);
1433 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1434 return newCollectorActor(leaderRaftActor, LEADER_ID);
1437 private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1438 TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1439 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1440 actorFactory.generateActorId(id + "Collector"));
1441 raftActor.setCollectorActor(collectorActor);
1442 return collectorActor;
1445 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1446 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1447 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1448 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1449 assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1452 private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1453 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1454 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1455 configParams.setElectionTimeoutFactor(100000);
1456 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1457 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1458 termInfo.update(1, LEADER_ID);
1459 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1460 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1463 static abstract class AbstractMockRaftActor extends MockRaftActor {
1464 private volatile TestActorRef<MessageCollectorActor> collectorActor;
1465 private volatile Class<?> dropMessageOfType;
1467 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1468 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1469 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1470 persistent(Optional.of(persistent)));
1471 this.collectorActor = collectorActor;
1474 void setDropMessageOfType(Class<?> dropMessageOfType) {
1475 this.dropMessageOfType = dropMessageOfType;
1478 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1479 this.collectorActor = collectorActor;
1483 public void handleCommand(Object message) {
1484 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1485 super.handleCommand(message);
1488 if(collectorActor != null) {
1489 collectorActor.tell(message, getSender());
1494 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1496 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1497 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1498 super(id, peerAddresses, config, persistent, collectorActor);
1499 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1501 public void createSnapshot(ActorRef actorRef) {
1502 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1506 public void applySnapshot(byte[] snapshotBytes) {
1511 public static Props props(final String id, final Map<String, String> peerAddresses,
1512 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1514 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1515 persistent, collectorActor);
1520 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1521 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1522 RaftActorContext fromContext) {
1523 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1524 setPersistence(false);
1526 RaftActorContext context = getRaftActorContext();
1527 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1528 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1529 getState().add(entry.getData());
1530 context.getReplicatedLog().append(entry);
1533 context.setCommitIndex(fromContext.getCommitIndex());
1534 context.setLastApplied(fromContext.getLastApplied());
1535 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1536 fromContext.getTermInformation().getVotedFor());
1540 protected void initializeBehavior() {
1541 changeCurrentBehavior(new Leader(getRaftActorContext()));
1542 initializeBehaviorComplete.countDown();
1546 public void createSnapshot(ActorRef actorRef) {
1548 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1549 } catch (Exception e) {
1550 LOG.error("createSnapshot failed", e);
1554 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1555 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1556 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1557 configParams.setElectionTimeoutFactor(10);
1558 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1562 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1563 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1564 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1565 setPersistence(false);
1568 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1569 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);