2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
43 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
45 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
46 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
47 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
48 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
49 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
50 import org.opendaylight.controller.cluster.raft.messages.AddServer;
51 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
52 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
53 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
54 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
56 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
57 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
58 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
59 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
60 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
61 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.duration.FiniteDuration;
72 * Unit tests for RaftActorServerConfigurationSupport.
74 * @author Thomas Pantelis
76 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
77 static final String LEADER_ID = "leader";
78 static final String FOLLOWER_ID = "follower";
79 static final String FOLLOWER_ID2 = "follower2";
80 static final String NEW_SERVER_ID = "new-server";
81 static final String NEW_SERVER_ID2 = "new-server2";
82 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
83 private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
84 private static final boolean NO_PERSISTENCE = false;
85 private static final boolean PERSISTENT = true;
87 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
89 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
90 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
91 actorFactory.generateActorId(FOLLOWER_ID));
93 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
94 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
95 private RaftActorContext newFollowerActorContext;
97 private final JavaTestKit testKit = new JavaTestKit(getSystem());
100 public void setup() {
101 InMemoryJournal.clear();
102 InMemorySnapshotStore.clear();
105 private void setupNewFollower() {
106 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
108 newFollowerCollectorActor = actorFactory.createTestActor(
109 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
110 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
111 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
112 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
113 actorFactory.generateActorId(NEW_SERVER_ID));
116 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
117 } catch (Exception e) {
118 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
122 private static DefaultConfigParamsImpl newFollowerConfigParams() {
123 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
124 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
125 configParams.setElectionTimeoutFactor(100000);
126 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
131 public void tearDown() throws Exception {
132 actorFactory.close();
136 public void testAddServerWithExistingFollower() throws Exception {
137 LOG.info("testAddServerWithExistingFollower starting");
139 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
140 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
142 followerActorContext.setCommitIndex(2);
143 followerActorContext.setLastApplied(2);
145 Follower follower = new Follower(followerActorContext);
146 followerActor.underlyingActor().setBehavior(follower);
147 followerActorContext.setCurrentBehavior(follower);
149 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
150 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
151 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
152 actorFactory.generateActorId(LEADER_ID));
154 // Expect initial heartbeat from the leader.
155 expectFirstMatching(followerActor, AppendEntries.class);
156 clearMessages(followerActor);
158 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
159 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
161 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
163 // Leader should install snapshot - capture and verify ApplySnapshot contents
165 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
166 @SuppressWarnings("unchecked")
167 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
168 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
170 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
171 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
172 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
174 // Verify ServerConfigurationPayload entry in leader's log
176 expectFirstMatching(leaderCollectorActor, ApplyState.class);
177 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
178 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
179 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
180 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
181 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
182 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
184 // Verify ServerConfigurationPayload entry in both followers
186 expectFirstMatching(followerActor, ApplyState.class);
187 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
188 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
189 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
191 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
192 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
193 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
194 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
196 // Verify new server config was applied in both followers
198 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
200 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
202 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
203 expectFirstMatching(followerActor, ApplyState.class);
205 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
206 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
207 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
208 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
210 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
211 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
212 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
213 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
214 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
215 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
217 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
218 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
219 logEntry = persistedLogEntries.get(0);
220 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
221 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
222 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
223 logEntry.getData().getClass());
225 LOG.info("testAddServerWithExistingFollower ending");
229 public void testAddServerWithNoExistingFollower() throws Exception {
230 LOG.info("testAddServerWithNoExistingFollower starting");
233 RaftActorContext initialActorContext = new MockRaftActorContext();
234 initialActorContext.setCommitIndex(1);
235 initialActorContext.setLastApplied(1);
236 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
239 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
240 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
241 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
242 actorFactory.generateActorId(LEADER_ID));
244 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
245 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
247 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
249 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
251 // Leader should install snapshot - capture and verify ApplySnapshot contents
253 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
254 @SuppressWarnings("unchecked")
255 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
256 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
258 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
259 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
260 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
262 // Verify ServerConfigurationPayload entry in leader's log
264 expectFirstMatching(leaderCollectorActor, ApplyState.class);
265 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
266 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
267 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
268 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
269 votingServer(NEW_SERVER_ID));
271 // Verify ServerConfigurationPayload entry in the new follower
273 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
274 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
275 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
276 votingServer(NEW_SERVER_ID));
278 // Verify new server config was applied in the new follower
280 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
282 LOG.info("testAddServerWithNoExistingFollower ending");
286 public void testAddServersAsNonVoting() throws Exception {
287 LOG.info("testAddServersAsNonVoting starting");
290 RaftActorContext initialActorContext = new MockRaftActorContext();
292 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
293 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
294 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
295 actorFactory.generateActorId(LEADER_ID));
297 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
298 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
300 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
302 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
304 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
305 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
306 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
308 // Verify ServerConfigurationPayload entry in leader's log
310 expectFirstMatching(leaderCollectorActor, ApplyState.class);
312 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
313 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
314 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
315 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
316 nonVotingServer(NEW_SERVER_ID));
318 // Verify ServerConfigurationPayload entry in the new follower
320 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
321 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
322 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
323 nonVotingServer(NEW_SERVER_ID));
325 // Verify new server config was applied in the new follower
327 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
329 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
331 // Add another non-voting server.
333 clearMessages(leaderCollectorActor);
335 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
336 Follower newFollower2 = new Follower(follower2ActorContext);
337 followerActor.underlyingActor().setBehavior(newFollower2);
339 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
341 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
342 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
343 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
345 expectFirstMatching(leaderCollectorActor, ApplyState.class);
346 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
347 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
348 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
349 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
350 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
352 LOG.info("testAddServersAsNonVoting ending");
356 public void testAddServerWithOperationInProgress() throws Exception {
357 LOG.info("testAddServerWithOperationInProgress starting");
360 RaftActorContext initialActorContext = new MockRaftActorContext();
362 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
363 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
364 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
365 actorFactory.generateActorId(LEADER_ID));
367 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
368 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
370 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
372 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
373 Follower newFollower2 = new Follower(follower2ActorContext);
374 followerActor.underlyingActor().setBehavior(newFollower2);
376 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
377 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
379 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
381 // Wait for leader's install snapshot and capture it
383 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
385 // Send a second AddServer - should get queued
386 JavaTestKit testKit2 = new JavaTestKit(getSystem());
387 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
389 // Continue the first AddServer
390 newFollowerRaftActorInstance.setDropMessageOfType(null);
391 newFollowerRaftActor.tell(installSnapshot, leaderActor);
393 // Verify both complete successfully
394 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
395 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
397 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
398 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
400 // Verify ServerConfigurationPayload entries in leader's log
402 expectMatching(leaderCollectorActor, ApplyState.class, 2);
403 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
404 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
405 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
406 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
407 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
409 // Verify ServerConfigurationPayload entry in the new follower
411 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
412 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
413 newFollowerActorContext.getPeerIds());
415 LOG.info("testAddServerWithOperationInProgress ending");
419 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
420 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
423 RaftActorContext initialActorContext = new MockRaftActorContext();
425 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
426 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
427 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
428 actorFactory.generateActorId(LEADER_ID));
430 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
431 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
433 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
435 // Drop commit message for now to delay snapshot completion
436 leaderRaftActor.setDropMessageOfType(String.class);
438 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
440 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
442 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
444 leaderRaftActor.setDropMessageOfType(null);
445 leaderActor.tell(commitMsg, leaderActor);
447 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
448 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
449 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
451 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
453 // Verify ServerConfigurationPayload entry in leader's log
455 expectFirstMatching(leaderCollectorActor, ApplyState.class);
456 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
457 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
458 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
459 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
460 votingServer(NEW_SERVER_ID));
462 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
466 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
467 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
470 RaftActorContext initialActorContext = new MockRaftActorContext();
472 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
473 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
474 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
475 actorFactory.generateActorId(LEADER_ID));
477 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
478 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
480 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
482 // Drop commit message so the snapshot doesn't complete.
483 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
485 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
487 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
489 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
490 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
492 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
494 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
498 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
499 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
502 RaftActorContext initialActorContext = new MockRaftActorContext();
504 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
505 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
506 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
507 actorFactory.generateActorId(LEADER_ID));
509 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
510 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
511 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
513 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
515 // Drop the commit message so the snapshot doesn't complete yet.
516 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
518 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
520 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
522 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
524 // Change the leader behavior to follower
525 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
527 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
528 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
529 // isCapturing assertion below.
530 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
532 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
533 leaderActor.tell(commitMsg, leaderActor);
535 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
537 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
538 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
540 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
541 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
543 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
547 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
548 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
551 RaftActorContext initialActorContext = new MockRaftActorContext();
553 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
554 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
555 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
556 actorFactory.generateActorId(LEADER_ID));
558 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
559 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
561 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
563 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
565 // Drop the UnInitializedFollowerSnapshotReply to delay it.
566 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
568 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
570 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
571 UnInitializedFollowerSnapshotReply.class);
573 // Prevent election timeout when the leader switches to follower
574 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
576 // Change the leader behavior to follower
577 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
579 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
580 leaderRaftActor.setDropMessageOfType(null);
581 leaderActor.tell(snapshotReply, leaderActor);
583 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
584 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
586 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
588 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
592 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
593 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
596 RaftActorContext initialActorContext = new MockRaftActorContext();
598 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
599 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
600 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
601 actorFactory.generateActorId(LEADER_ID));
603 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
604 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
605 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
607 // Drop the InstallSnapshot message so it times out
608 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
610 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
612 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
614 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
615 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
617 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
618 assertEquals("Leader followers size", 0,
619 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
621 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
625 public void testAddServerWithNoLeader() {
626 LOG.info("testAddServerWithNoLeader starting");
629 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
630 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
632 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
633 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
634 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
635 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
636 actorFactory.generateActorId(LEADER_ID));
637 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
639 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
640 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
641 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
643 LOG.info("testAddServerWithNoLeader ending");
647 public void testAddServerWithNoConsensusReached() {
648 LOG.info("testAddServerWithNoConsensusReached starting");
651 RaftActorContext initialActorContext = new MockRaftActorContext();
653 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
654 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
655 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
656 actorFactory.generateActorId(LEADER_ID));
658 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
659 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
661 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
663 // Drop UnInitializedFollowerSnapshotReply initially
664 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
666 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
667 TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
668 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
670 // Drop AppendEntries to the new follower so consensus isn't reached
671 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
673 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
675 // Capture the UnInitializedFollowerSnapshotReply
676 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
678 // Send the UnInitializedFollowerSnapshotReply to resume the first request
679 leaderRaftActor.setDropMessageOfType(null);
680 leaderActor.tell(snapshotReply, leaderActor);
682 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
684 // Send a second AddServer
685 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
687 // The first AddServer should succeed with OK even though consensus wasn't reached
688 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
689 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
690 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
692 // Verify ServerConfigurationPayload entry in leader's log
693 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
694 votingServer(NEW_SERVER_ID));
696 // The second AddServer should fail since consensus wasn't reached for the first
697 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
698 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
700 // Re-send the second AddServer - should also fail
701 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
702 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
703 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
705 LOG.info("testAddServerWithNoConsensusReached ending");
709 public void testAddServerWithExistingServer() {
710 LOG.info("testAddServerWithExistingServer starting");
712 RaftActorContext initialActorContext = new MockRaftActorContext();
714 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
715 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
716 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
717 actorFactory.generateActorId(LEADER_ID));
719 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
721 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
722 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
724 LOG.info("testAddServerWithExistingServer ending");
728 public void testAddServerForwardedToLeader() {
729 LOG.info("testAddServerForwardedToLeader starting");
732 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
733 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
735 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
736 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
737 actorFactory.generateActorId(LEADER_ID));
739 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
740 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
741 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
742 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
743 actorFactory.generateActorId(FOLLOWER_ID));
744 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
746 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
747 -1, -1, (short)0), leaderActor);
749 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
750 expectFirstMatching(leaderActor, AddServer.class);
752 LOG.info("testAddServerForwardedToLeader ending");
756 public void testOnApplyState() {
757 LOG.info("testOnApplyState starting");
759 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
760 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
761 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
762 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
763 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
764 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
765 actorFactory.generateActorId(LEADER_ID));
767 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
769 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
770 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
771 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
772 assertEquals("Message handled", true, handled);
774 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
775 new MockRaftActorContext.MockPayload("1"));
776 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
777 assertEquals("Message handled", false, handled);
779 LOG.info("testOnApplyState ending");
783 public void testRemoveServerWithNoLeader() {
784 LOG.info("testRemoveServerWithNoLeader starting");
786 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
787 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
789 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
790 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
791 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
792 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
793 actorFactory.generateActorId(LEADER_ID));
794 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
796 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
797 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
798 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
800 LOG.info("testRemoveServerWithNoLeader ending");
804 public void testRemoveServerNonExistentServer() {
805 LOG.info("testRemoveServerNonExistentServer starting");
807 RaftActorContext initialActorContext = new MockRaftActorContext();
809 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
810 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
811 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
812 actorFactory.generateActorId(LEADER_ID));
814 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
815 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
816 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
818 LOG.info("testRemoveServerNonExistentServer ending");
822 public void testRemoveServerForwardToLeader() {
823 LOG.info("testRemoveServerForwardToLeader starting");
825 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
826 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
828 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
829 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
830 actorFactory.generateActorId(LEADER_ID));
832 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
833 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
834 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
835 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
836 actorFactory.generateActorId(FOLLOWER_ID));
837 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
839 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
840 -1, -1, (short)0), leaderActor);
842 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
843 expectFirstMatching(leaderActor, RemoveServer.class);
845 LOG.info("testRemoveServerForwardToLeader ending");
849 public void testRemoveServer() {
850 LOG.info("testRemoveServer starting");
852 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
853 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
854 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
856 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
857 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
858 RaftActorContext initialActorContext = new MockRaftActorContext();
860 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
861 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
862 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
863 actorFactory.generateActorId(LEADER_ID));
865 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
867 TestActorRef<MessageCollectorActor> collector =
868 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
869 actorFactory.generateActorId("collector"));
870 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
871 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
872 configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
875 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
876 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
877 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
879 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
880 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
881 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
883 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
884 assertTrue("Expected Leader", currentBehavior instanceof Leader);
885 assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
887 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
889 LOG.info("testRemoveServer ending");
893 public void testRemoveServerLeader() {
894 LOG.info("testRemoveServerLeader starting");
896 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
897 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
898 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
900 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
901 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
902 RaftActorContext initialActorContext = new MockRaftActorContext();
904 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
905 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
906 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
907 actorFactory.generateActorId(LEADER_ID));
909 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
911 TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
912 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
913 actorFactory.createTestActor(
914 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
915 configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
918 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
919 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
920 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
922 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
923 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
924 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
925 votingServer(FOLLOWER_ID));
927 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
929 LOG.info("testRemoveServerLeader ending");
933 public void testRemoveServerLeaderWithNoFollowers() {
934 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
936 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
937 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
938 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
939 actorFactory.generateActorId(LEADER_ID));
941 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
942 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
943 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
945 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
949 public void testChangeServersVotingStatus() {
950 LOG.info("testChangeServersVotingStatus starting");
952 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
953 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
954 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
956 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
957 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
958 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
959 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
961 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
962 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
963 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
964 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
965 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
967 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
968 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
969 actorFactory.generateActorId("collector"));
970 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
971 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
972 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
973 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
975 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
976 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
977 actorFactory.generateActorId("collector"));
978 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
979 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
980 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
981 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
983 // Send first ChangeServersVotingStatus message
985 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
987 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
988 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
990 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
991 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
992 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
993 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
995 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
996 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
997 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
999 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1000 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1001 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1003 MessageCollectorActor.clearMessages(leaderCollector);
1004 MessageCollectorActor.clearMessages(follower1Collector);
1005 MessageCollectorActor.clearMessages(follower2Collector);
1007 // Send second ChangeServersVotingStatus message
1009 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1010 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1011 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1013 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1014 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1015 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1017 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1018 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1019 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1021 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1022 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1023 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1025 LOG.info("testChangeServersVotingStatus ending");
1029 public void testChangeLeaderToNonVoting() {
1030 LOG.info("testChangeLeaderToNonVoting starting");
1032 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1033 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1035 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1036 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1037 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1038 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1040 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1041 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1042 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1043 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1044 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1046 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1047 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1048 actorFactory.generateActorId("collector"));
1049 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1050 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1051 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1052 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1054 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1055 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1056 actorFactory.generateActorId("collector"));
1057 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1058 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1059 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1060 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1062 // Send ChangeServersVotingStatus message
1064 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1065 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1066 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1068 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1069 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1070 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1072 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1073 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1074 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1076 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1077 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1078 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1080 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1081 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1083 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1085 LOG.info("testChangeLeaderToNonVoting ending");
1089 public void testChangeLeaderToNonVotingInSingleNode() {
1090 LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1092 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1093 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
1094 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1096 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1097 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1098 assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1100 LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1104 public void testChangeToVotingWithNoLeader() {
1105 LOG.info("testChangeToVotingWithNoLeader starting");
1107 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1108 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1109 configParams.setElectionTimeoutFactor(5);
1111 final String node1ID = "node1";
1112 final String node2ID = "node2";
1114 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1115 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1118 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1119 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1120 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1121 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1123 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1124 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1125 InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1126 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1127 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1128 InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1130 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1131 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1132 actorFactory.generateActorId("collector"));
1133 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1134 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1135 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1136 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1138 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1139 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1140 actorFactory.generateActorId("collector"));
1141 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1142 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1143 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1144 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1146 node1RaftActor.waitForInitializeBehaviorComplete();
1147 node2RaftActor.waitForInitializeBehaviorComplete();
1149 // Verify the intended server config was loaded and applied.
1150 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1151 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1152 votingServer("downNode2"));
1153 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1154 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1155 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1157 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1158 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1159 votingServer("downNode2"));
1160 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1162 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1163 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1164 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1165 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1167 // First send the message such that node1 has no peer address for node2 - should fail.
1169 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1170 node2ID, true, "downNode1", false, "downNode2", false));
1171 node1RaftActorRef.tell(changeServers, testKit.getRef());
1172 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1173 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1175 // Send an AppendEntries so node1 has a leaderId
1177 MessageCollectorActor.clearMessages(node1Collector);
1179 long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1180 node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1181 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1183 // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
1184 // ChangeServersVotingStatus message, it will try to elect a leader.
1186 MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
1188 // Update node2's peer address and send the message again
1190 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1192 node1RaftActorRef.tell(changeServers, testKit.getRef());
1193 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1194 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1196 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1197 assertEquals("getToIndex", 1, apply.getToIndex());
1198 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1199 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1200 nonVotingServer("downNode2"));
1201 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1202 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1204 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1205 assertEquals("getToIndex", 1, apply.getToIndex());
1206 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1207 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1208 nonVotingServer("downNode2"));
1209 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1210 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1212 LOG.info("testChangeToVotingWithNoLeader ending");
1216 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1217 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1219 final String node1ID = "node1";
1220 final String node2ID = "node2";
1222 PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1223 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1225 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1226 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1227 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1229 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1230 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1231 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1232 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1234 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1235 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1236 configParams1.setElectionTimeoutFactor(1);
1237 configParams1.setPeerAddressResolver(peerAddressResolver);
1238 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1239 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1240 actorFactory.generateActorId("collector"));
1241 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1242 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1243 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1244 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1246 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1247 configParams2.setElectionTimeoutFactor(1000000);
1248 configParams2.setPeerAddressResolver(peerAddressResolver);
1249 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1250 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1251 actorFactory.generateActorId("collector"));
1252 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1253 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1254 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1255 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1257 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1258 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1259 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1260 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1261 // node2 was previously voting.
1263 node2RaftActor.setDropMessageOfType(RequestVote.class);
1265 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1266 node1RaftActorRef.tell(changeServers, testKit.getRef());
1267 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1268 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1270 assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1271 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1272 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1274 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1278 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1279 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1281 final String node1ID = "node1";
1282 final String node2ID = "node2";
1284 PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1285 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1287 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1288 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1289 configParams.setElectionTimeoutFactor(3);
1290 configParams.setPeerAddressResolver(peerAddressResolver);
1292 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1293 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1294 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1296 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1297 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1298 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1299 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1300 InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1301 new MockRaftActorContext.MockPayload("2")));
1303 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1304 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1305 actorFactory.generateActorId("collector"));
1306 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1307 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1308 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1309 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1311 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1312 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1313 actorFactory.generateActorId("collector"));
1314 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1315 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1316 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1317 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1319 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1320 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1321 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1322 // forward the request to node2.
1324 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1325 ImmutableMap.of(node1ID, true, node2ID, true));
1326 node1RaftActorRef.tell(changeServers, testKit.getRef());
1327 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1328 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1330 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1331 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1332 votingServer(node1ID), votingServer(node2ID));
1333 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1335 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1336 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1337 votingServer(node1ID), votingServer(node2ID));
1338 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1339 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1341 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1345 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1346 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1348 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1349 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1350 configParams.setElectionTimeoutFactor(100000);
1352 final String node1ID = "node1";
1353 final String node2ID = "node2";
1355 configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1356 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
1358 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1359 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1360 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1362 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1363 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1364 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1365 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1367 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1368 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1369 actorFactory.generateActorId("collector"));
1370 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1371 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1372 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1373 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1375 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1376 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1377 actorFactory.generateActorId("collector"));
1378 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1379 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1380 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1381 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1383 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1384 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1385 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1386 // request to node2 when node2 is elected.
1388 node2RaftActor.setDropMessageOfType(RequestVote.class);
1390 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1392 node1RaftActorRef.tell(changeServers, testKit.getRef());
1394 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1396 node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1398 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1399 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1401 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1402 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1403 votingServer(node1ID), votingServer(node2ID));
1404 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1405 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1407 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1408 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1409 votingServer(node1ID), votingServer(node2ID));
1410 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1412 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1415 private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1416 Stopwatch sw = Stopwatch.createStarted();
1417 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1418 for(RaftActor raftActor: raftActors) {
1419 if(raftActor.getRaftState() == expState) {
1425 fail("None of the RaftActors have state " + expState);
1428 private static ServerInfo votingServer(String id) {
1429 return new ServerInfo(id, true);
1432 private static ServerInfo nonVotingServer(String id) {
1433 return new ServerInfo(id, false);
1436 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1437 return newCollectorActor(leaderRaftActor, LEADER_ID);
1440 private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1441 TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1442 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1443 actorFactory.generateActorId(id + "Collector"));
1444 raftActor.setCollectorActor(collectorActor);
1445 return collectorActor;
1448 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1449 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1450 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1451 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1452 assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1455 private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1456 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1457 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1458 configParams.setElectionTimeoutFactor(100000);
1459 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1460 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1461 termInfo.update(1, LEADER_ID);
1462 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1463 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1466 static abstract class AbstractMockRaftActor extends MockRaftActor {
1467 private volatile TestActorRef<MessageCollectorActor> collectorActor;
1468 private volatile Class<?> dropMessageOfType;
1470 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1471 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1472 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1473 persistent(Optional.of(persistent)));
1474 this.collectorActor = collectorActor;
1477 void setDropMessageOfType(Class<?> dropMessageOfType) {
1478 this.dropMessageOfType = dropMessageOfType;
1481 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1482 this.collectorActor = collectorActor;
1486 public void handleCommand(Object message) {
1487 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1488 super.handleCommand(message);
1491 if(collectorActor != null) {
1492 collectorActor.tell(message, getSender());
1497 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1499 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1500 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1501 super(id, peerAddresses, config, persistent, collectorActor);
1502 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1504 public void createSnapshot(ActorRef actorRef) {
1505 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1509 public void applySnapshot(byte[] snapshotBytes) {
1514 public static Props props(final String id, final Map<String, String> peerAddresses,
1515 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1517 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1518 persistent, collectorActor);
1523 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1524 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1525 RaftActorContext fromContext) {
1526 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1527 setPersistence(false);
1529 RaftActorContext context = getRaftActorContext();
1530 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1531 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1532 getState().add(entry.getData());
1533 context.getReplicatedLog().append(entry);
1536 context.setCommitIndex(fromContext.getCommitIndex());
1537 context.setLastApplied(fromContext.getLastApplied());
1538 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1539 fromContext.getTermInformation().getVotedFor());
1543 protected void initializeBehavior() {
1544 changeCurrentBehavior(new Leader(getRaftActorContext()));
1545 initializeBehaviorComplete.countDown();
1549 public void createSnapshot(ActorRef actorRef) {
1551 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1552 } catch (Exception e) {
1553 LOG.error("createSnapshot failed", e);
1557 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1558 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1559 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1560 configParams.setElectionTimeoutFactor(10);
1561 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1565 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1566 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1567 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1568 setPersistence(false);
1571 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1572 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);