2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
18 import akka.actor.ActorRef;
19 import akka.actor.Props;
20 import akka.actor.UntypedActor;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.Maps;
28 import com.google.common.collect.Sets;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
40 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
41 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
42 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
44 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
45 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
46 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
47 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
48 import org.opendaylight.controller.cluster.raft.messages.AddServer;
49 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
50 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
51 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
52 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
54 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
55 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
56 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
57 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
58 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
59 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
62 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
63 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
64 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
65 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
66 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
67 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
68 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
69 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72 import scala.concurrent.duration.FiniteDuration;
75 * Unit tests for RaftActorServerConfigurationSupport.
77 * @author Thomas Pantelis
79 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
80 static final String LEADER_ID = "leader";
81 static final String FOLLOWER_ID = "follower";
82 static final String FOLLOWER_ID2 = "follower2";
83 static final String NEW_SERVER_ID = "new-server";
84 static final String NEW_SERVER_ID2 = "new-server2";
85 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
86 private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
87 private static final boolean NO_PERSISTENCE = false;
88 private static final boolean PERSISTENT = true;
90 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
92 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
93 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
94 actorFactory.generateActorId(FOLLOWER_ID));
96 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
97 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
98 private RaftActorContext newFollowerActorContext;
100 private final JavaTestKit testKit = new JavaTestKit(getSystem());
103 public void setup() {
104 InMemoryJournal.clear();
105 InMemorySnapshotStore.clear();
108 @SuppressWarnings("checkstyle:IllegalCatch")
109 private void setupNewFollower() {
110 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
112 newFollowerCollectorActor = actorFactory.createTestActor(
113 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
114 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
115 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
116 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
117 actorFactory.generateActorId(NEW_SERVER_ID));
120 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
121 } catch (Exception e) {
122 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
126 private static DefaultConfigParamsImpl newFollowerConfigParams() {
127 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
128 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
129 configParams.setElectionTimeoutFactor(100000);
130 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
135 public void tearDown() throws Exception {
136 actorFactory.close();
140 public void testAddServerWithExistingFollower() throws Exception {
141 LOG.info("testAddServerWithExistingFollower starting");
143 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
144 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
146 followerActorContext.setCommitIndex(2);
147 followerActorContext.setLastApplied(2);
149 Follower follower = new Follower(followerActorContext);
150 followerActor.underlyingActor().setBehavior(follower);
151 followerActorContext.setCurrentBehavior(follower);
153 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
154 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
155 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
156 actorFactory.generateActorId(LEADER_ID));
158 // Expect initial heartbeat from the leader.
159 expectFirstMatching(followerActor, AppendEntries.class);
160 clearMessages(followerActor);
162 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
163 final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
165 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
167 // Leader should install snapshot - capture and verify ApplySnapshot contents
169 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
170 @SuppressWarnings("unchecked")
171 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
172 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
174 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
175 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
176 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
178 // Verify ServerConfigurationPayload entry in leader's log
180 expectFirstMatching(leaderCollectorActor, ApplyState.class);
181 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
182 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
183 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
184 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
185 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
186 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
188 // Verify ServerConfigurationPayload entry in both followers
190 expectFirstMatching(followerActor, ApplyState.class);
191 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
192 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
193 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
195 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
196 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
197 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
198 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
200 // Verify new server config was applied in both followers
202 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
204 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
205 newFollowerActorContext.getPeerIds());
207 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
208 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
209 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
210 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
212 assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
213 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
214 assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
215 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
217 assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
218 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
219 assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
220 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
222 LOG.info("testAddServerWithExistingFollower ending");
226 public void testAddServerWithNoExistingFollower() throws Exception {
227 LOG.info("testAddServerWithNoExistingFollower starting");
230 RaftActorContext initialActorContext = new MockRaftActorContext();
231 initialActorContext.setCommitIndex(1);
232 initialActorContext.setLastApplied(1);
233 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
236 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
237 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
238 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
239 actorFactory.generateActorId(LEADER_ID));
241 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
242 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
244 final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
246 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
248 // Leader should install snapshot - capture and verify ApplySnapshot contents
250 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
251 @SuppressWarnings("unchecked")
252 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
253 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
255 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
256 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
257 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
259 // Verify ServerConfigurationPayload entry in leader's log
261 expectFirstMatching(leaderCollectorActor, ApplyState.class);
262 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
263 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
264 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
265 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
266 votingServer(NEW_SERVER_ID));
268 // Verify ServerConfigurationPayload entry in the new follower
270 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
271 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
272 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
273 votingServer(NEW_SERVER_ID));
275 // Verify new server config was applied in the new follower
277 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
279 LOG.info("testAddServerWithNoExistingFollower ending");
283 public void testAddServersAsNonVoting() throws Exception {
284 LOG.info("testAddServersAsNonVoting starting");
287 RaftActorContext initialActorContext = new MockRaftActorContext();
289 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
290 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
291 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
292 actorFactory.generateActorId(LEADER_ID));
294 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
295 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
297 final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
299 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
301 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
302 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
303 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
305 // Verify ServerConfigurationPayload entry in leader's log
307 expectFirstMatching(leaderCollectorActor, ApplyState.class);
309 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
310 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
311 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
312 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
313 nonVotingServer(NEW_SERVER_ID));
315 // Verify ServerConfigurationPayload entry in the new follower
317 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
318 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
319 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
320 nonVotingServer(NEW_SERVER_ID));
322 // Verify new server config was applied in the new follower
324 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
326 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
328 // Add another non-voting server.
330 clearMessages(leaderCollectorActor);
332 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
333 Follower newFollower2 = new Follower(follower2ActorContext);
334 followerActor.underlyingActor().setBehavior(newFollower2);
336 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
338 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
339 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
340 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
342 expectFirstMatching(leaderCollectorActor, ApplyState.class);
343 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
344 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
345 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
346 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
347 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
349 LOG.info("testAddServersAsNonVoting ending");
353 public void testAddServerWithOperationInProgress() throws Exception {
354 LOG.info("testAddServerWithOperationInProgress starting");
357 RaftActorContext initialActorContext = new MockRaftActorContext();
359 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
360 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
361 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
362 actorFactory.generateActorId(LEADER_ID));
364 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
365 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
367 final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
369 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
370 Follower newFollower2 = new Follower(follower2ActorContext);
371 followerActor.underlyingActor().setBehavior(newFollower2);
373 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
374 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
376 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
378 // Wait for leader's install snapshot and capture it
380 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
382 // Send a second AddServer - should get queued
383 JavaTestKit testKit2 = new JavaTestKit(getSystem());
384 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
386 // Continue the first AddServer
387 newFollowerRaftActorInstance.setDropMessageOfType(null);
388 newFollowerRaftActor.tell(installSnapshot, leaderActor);
390 // Verify both complete successfully
391 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
392 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
394 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
395 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
397 // Verify ServerConfigurationPayload entries in leader's log
399 expectMatching(leaderCollectorActor, ApplyState.class, 2);
400 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
401 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
402 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
403 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
404 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
406 // Verify ServerConfigurationPayload entry in the new follower
408 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
409 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
410 newFollowerActorContext.getPeerIds());
412 LOG.info("testAddServerWithOperationInProgress ending");
416 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
417 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
420 RaftActorContext initialActorContext = new MockRaftActorContext();
422 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
423 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
424 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
425 actorFactory.generateActorId(LEADER_ID));
427 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
428 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
430 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
432 // Drop commit message for now to delay snapshot completion
433 leaderRaftActor.setDropMessageOfType(String.class);
435 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
437 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
439 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
441 leaderRaftActor.setDropMessageOfType(null);
442 leaderActor.tell(commitMsg, leaderActor);
444 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
445 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
446 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
448 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
450 // Verify ServerConfigurationPayload entry in leader's log
452 expectFirstMatching(leaderCollectorActor, ApplyState.class);
453 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
454 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
455 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
456 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
457 votingServer(NEW_SERVER_ID));
459 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
463 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
464 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
467 RaftActorContext initialActorContext = new MockRaftActorContext();
469 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
470 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
471 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
472 actorFactory.generateActorId(LEADER_ID));
474 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
475 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
477 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
479 // Drop commit message so the snapshot doesn't complete.
480 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
482 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
484 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
486 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
487 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
489 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
491 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
495 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
496 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
499 RaftActorContext initialActorContext = new MockRaftActorContext();
501 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
502 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
503 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
504 actorFactory.generateActorId(LEADER_ID));
506 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
507 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
508 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
510 final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
512 // Drop the commit message so the snapshot doesn't complete yet.
513 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
515 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
517 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
519 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
521 // Change the leader behavior to follower
522 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
524 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
525 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
526 // isCapturing assertion below.
527 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
529 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
530 leaderActor.tell(commitMsg, leaderActor);
532 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
534 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
535 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
537 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
538 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
540 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
544 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
545 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
548 RaftActorContext initialActorContext = new MockRaftActorContext();
550 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
551 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
552 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
553 actorFactory.generateActorId(LEADER_ID));
555 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
556 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
558 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
560 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
562 // Drop the UnInitializedFollowerSnapshotReply to delay it.
563 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
565 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
567 final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
568 UnInitializedFollowerSnapshotReply.class);
570 // Prevent election timeout when the leader switches to follower
571 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
573 // Change the leader behavior to follower
574 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
576 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
577 leaderRaftActor.setDropMessageOfType(null);
578 leaderActor.tell(snapshotReply, leaderActor);
580 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
581 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
583 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
585 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
589 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
590 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
593 RaftActorContext initialActorContext = new MockRaftActorContext();
595 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
596 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
597 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
598 actorFactory.generateActorId(LEADER_ID));
600 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
601 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
602 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
604 // Drop the InstallSnapshot message so it times out
605 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
607 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
609 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
611 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
612 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
614 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
615 assertEquals("Leader followers size", 0,
616 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
618 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
622 public void testAddServerWithNoLeader() {
623 LOG.info("testAddServerWithNoLeader starting");
626 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
627 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
629 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
630 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
631 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
632 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
633 actorFactory.generateActorId(LEADER_ID));
634 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
636 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
638 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
639 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
641 LOG.info("testAddServerWithNoLeader ending");
645 public void testAddServerWithNoConsensusReached() {
646 LOG.info("testAddServerWithNoConsensusReached starting");
649 RaftActorContext initialActorContext = new MockRaftActorContext();
651 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
652 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
653 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
654 actorFactory.generateActorId(LEADER_ID));
656 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
657 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
659 final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
661 // Drop UnInitializedFollowerSnapshotReply initially
662 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
664 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
665 newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
667 // Drop AppendEntries to the new follower so consensus isn't reached
668 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
670 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
672 // Capture the UnInitializedFollowerSnapshotReply
673 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
675 // Send the UnInitializedFollowerSnapshotReply to resume the first request
676 leaderRaftActor.setDropMessageOfType(null);
677 leaderActor.tell(snapshotReply, leaderActor);
679 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
681 // Send a second AddServer
682 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
684 // The first AddServer should succeed with OK even though consensus wasn't reached
685 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
686 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
687 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
689 // Verify ServerConfigurationPayload entry in leader's log
690 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
691 votingServer(NEW_SERVER_ID));
693 // The second AddServer should fail since consensus wasn't reached for the first
694 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
695 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
697 // Re-send the second AddServer - should also fail
698 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
699 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
700 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
702 LOG.info("testAddServerWithNoConsensusReached ending");
706 public void testAddServerWithExistingServer() {
707 LOG.info("testAddServerWithExistingServer starting");
709 RaftActorContext initialActorContext = new MockRaftActorContext();
711 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
712 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
713 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
714 actorFactory.generateActorId(LEADER_ID));
716 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
718 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
719 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
721 LOG.info("testAddServerWithExistingServer ending");
725 public void testAddServerForwardedToLeader() {
726 LOG.info("testAddServerForwardedToLeader starting");
729 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
730 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
732 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
733 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
734 actorFactory.generateActorId(LEADER_ID));
736 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
737 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
738 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
739 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
740 actorFactory.generateActorId(FOLLOWER_ID));
741 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
743 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
744 -1, -1, (short)0), leaderActor);
746 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
748 expectFirstMatching(leaderActor, AddServer.class);
750 LOG.info("testAddServerForwardedToLeader ending");
754 public void testOnApplyState() {
755 LOG.info("testOnApplyState starting");
757 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
758 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
759 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
760 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
761 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
762 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
763 actorFactory.generateActorId(LEADER_ID));
765 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
766 noLeaderActor.underlyingActor());
768 ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
769 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
770 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
771 assertEquals("Message handled", true, handled);
773 ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
774 new MockRaftActorContext.MockPayload("1"));
775 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
776 assertEquals("Message handled", false, handled);
778 LOG.info("testOnApplyState ending");
782 public void testRemoveServerWithNoLeader() {
783 LOG.info("testRemoveServerWithNoLeader starting");
785 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
786 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
788 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
789 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
790 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
791 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
792 actorFactory.generateActorId(LEADER_ID));
793 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
795 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
796 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
797 RemoveServerReply.class);
798 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
800 LOG.info("testRemoveServerWithNoLeader ending");
804 public void testRemoveServerNonExistentServer() {
805 LOG.info("testRemoveServerNonExistentServer starting");
807 RaftActorContext initialActorContext = new MockRaftActorContext();
809 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
810 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
811 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
812 actorFactory.generateActorId(LEADER_ID));
814 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
815 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
816 RemoveServerReply.class);
817 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
819 LOG.info("testRemoveServerNonExistentServer ending");
823 public void testRemoveServerForwardToLeader() {
824 LOG.info("testRemoveServerForwardToLeader starting");
826 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
827 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
829 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
830 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
831 actorFactory.generateActorId(LEADER_ID));
833 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
834 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
835 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
836 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
837 actorFactory.generateActorId(FOLLOWER_ID));
838 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
840 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
841 -1, -1, (short)0), leaderActor);
843 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
844 expectFirstMatching(leaderActor, RemoveServer.class);
846 LOG.info("testRemoveServerForwardToLeader ending");
850 public void testRemoveServer() {
851 LOG.info("testRemoveServer starting");
853 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
854 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
855 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
857 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
858 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
859 RaftActorContext initialActorContext = new MockRaftActorContext();
861 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
862 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
863 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
864 actorFactory.generateActorId(LEADER_ID));
866 final TestActorRef<MessageCollectorActor> leaderCollector =
867 newLeaderCollectorActor(leaderActor.underlyingActor());
869 TestActorRef<MessageCollectorActor> collector = actorFactory.createTestActor(MessageCollectorActor.props()
870 .withDispatcher(Dispatchers.DefaultDispatcherId()),
871 actorFactory.generateActorId("collector"));
872 actorFactory.createTestActor(
873 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
874 configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
877 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
878 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
879 RemoveServerReply.class);
880 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
882 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
883 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
884 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
885 votingServer(LEADER_ID));
887 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
888 assertTrue("Expected Leader", currentBehavior instanceof Leader);
889 assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
891 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
893 LOG.info("testRemoveServer ending");
897 public void testRemoveServerLeader() {
898 LOG.info("testRemoveServerLeader starting");
900 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
901 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
902 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
904 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
905 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
906 RaftActorContext initialActorContext = new MockRaftActorContext();
908 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
909 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
910 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
911 actorFactory.generateActorId(LEADER_ID));
913 final TestActorRef<MessageCollectorActor> leaderCollector =
914 newLeaderCollectorActor(leaderActor.underlyingActor());
916 final TestActorRef<MessageCollectorActor> followerCollector =
917 actorFactory.createTestActor(MessageCollectorActor.props()
918 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
919 actorFactory.createTestActor(
920 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
921 configParams, NO_PERSISTENCE, followerCollector)
922 .withDispatcher(Dispatchers.DefaultDispatcherId()),
925 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
926 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
927 RemoveServerReply.class);
928 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
930 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
931 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
932 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
933 votingServer(FOLLOWER_ID));
935 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
937 LOG.info("testRemoveServerLeader ending");
941 public void testRemoveServerLeaderWithNoFollowers() {
942 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
944 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
945 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
946 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
947 actorFactory.generateActorId(LEADER_ID));
949 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
950 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
951 RemoveServerReply.class);
952 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
954 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
958 public void testChangeServersVotingStatus() {
959 LOG.info("testChangeServersVotingStatus starting");
961 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
962 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
963 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
965 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
966 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
967 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
968 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
970 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
971 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
972 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
973 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
974 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
976 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
977 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
978 actorFactory.generateActorId("collector"));
979 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
980 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
981 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
982 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
984 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
985 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
986 actorFactory.generateActorId("collector"));
987 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
988 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
989 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
990 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
992 // Send first ChangeServersVotingStatus message
994 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
996 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
997 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
999 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1000 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1001 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1002 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1004 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1005 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1006 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1007 nonVotingServer(FOLLOWER_ID2));
1009 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1010 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1011 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1012 nonVotingServer(FOLLOWER_ID2));
1014 MessageCollectorActor.clearMessages(leaderCollector);
1015 MessageCollectorActor.clearMessages(follower1Collector);
1016 MessageCollectorActor.clearMessages(follower2Collector);
1018 // Send second ChangeServersVotingStatus message
1020 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1021 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1022 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1024 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1025 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1026 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1028 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1029 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1030 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1032 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1033 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1034 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1036 LOG.info("testChangeServersVotingStatus ending");
1040 public void testChangeLeaderToNonVoting() {
1041 LOG.info("testChangeLeaderToNonVoting starting");
1043 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1044 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1046 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1047 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1048 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1049 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1051 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1052 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1053 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1054 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1055 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1057 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1058 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1059 actorFactory.generateActorId("collector"));
1060 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1061 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1062 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1063 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1065 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1066 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1067 actorFactory.generateActorId("collector"));
1068 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1069 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1070 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1071 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1073 // Send ChangeServersVotingStatus message
1075 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1076 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1077 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1079 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1080 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1081 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1083 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1084 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1085 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1087 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1088 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1089 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1091 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1092 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1094 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1096 LOG.info("testChangeLeaderToNonVoting ending");
1100 public void testChangeLeaderToNonVotingInSingleNode() {
1101 LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1103 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1104 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1105 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1107 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1108 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1109 assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1111 LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1115 public void testChangeToVotingWithNoLeader() {
1116 LOG.info("testChangeToVotingWithNoLeader starting");
1118 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1119 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1120 configParams.setElectionTimeoutFactor(5);
1122 final String node1ID = "node1";
1123 final String node2ID = "node2";
1125 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1126 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1129 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1130 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1131 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1132 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1134 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1135 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1136 InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1137 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1138 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1139 InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1141 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1142 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1143 actorFactory.generateActorId("collector"));
1144 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1145 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1146 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1147 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1149 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1150 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1151 actorFactory.generateActorId("collector"));
1152 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1153 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1154 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1155 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1157 node1RaftActor.waitForInitializeBehaviorComplete();
1158 node2RaftActor.waitForInitializeBehaviorComplete();
1160 // Verify the intended server config was loaded and applied.
1161 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1162 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1163 votingServer("downNode2"));
1164 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1165 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1166 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1168 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1169 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1170 votingServer("downNode2"));
1171 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1173 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1174 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1175 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1176 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1178 // First send the message such that node1 has no peer address for node2 - should fail.
1180 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1181 node2ID, true, "downNode1", false, "downNode2", false));
1182 node1RaftActorRef.tell(changeServers, testKit.getRef());
1183 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1184 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1186 // Send an AppendEntries so node1 has a leaderId
1188 MessageCollectorActor.clearMessages(node1Collector);
1190 long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1191 node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1192 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1194 // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
1195 // ChangeServersVotingStatus message, it will try to elect a leader.
1197 MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
1199 // Update node2's peer address and send the message again
1201 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1203 node1RaftActorRef.tell(changeServers, testKit.getRef());
1204 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1205 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1207 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1208 ApplyJournalEntries.class);
1209 assertEquals("getToIndex", 1, apply.getToIndex());
1210 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1211 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1212 nonVotingServer("downNode2"));
1213 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1214 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1216 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1217 assertEquals("getToIndex", 1, apply.getToIndex());
1218 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1219 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1220 nonVotingServer("downNode2"));
1221 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1222 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1224 LOG.info("testChangeToVotingWithNoLeader ending");
1228 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1229 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1231 final String node1ID = "node1";
1232 final String node2ID = "node2";
1234 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1235 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1236 ? actorFactory.createTestActorPath(node2ID) : null;
1238 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1239 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1240 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1242 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1243 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1244 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1245 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1247 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1248 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1249 configParams1.setElectionTimeoutFactor(1);
1250 configParams1.setPeerAddressResolver(peerAddressResolver);
1251 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1252 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1253 actorFactory.generateActorId("collector"));
1254 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1255 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1256 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1257 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1259 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1260 configParams2.setElectionTimeoutFactor(1000000);
1261 configParams2.setPeerAddressResolver(peerAddressResolver);
1262 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1263 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1264 actorFactory.generateActorId("collector"));
1265 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1266 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1267 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1268 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1270 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1271 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1272 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1273 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1274 // node2 was previously voting.
1276 node2RaftActor.setDropMessageOfType(RequestVote.class);
1278 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1279 node1RaftActorRef.tell(changeServers, testKit.getRef());
1280 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1281 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1283 assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1284 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1285 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1287 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1291 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1292 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1294 final String node1ID = "node1";
1295 final String node2ID = "node2";
1297 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1298 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1299 ? actorFactory.createTestActorPath(node2ID) : null;
1301 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1302 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1303 configParams.setElectionTimeoutFactor(3);
1304 configParams.setPeerAddressResolver(peerAddressResolver);
1306 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1307 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1308 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1310 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1311 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1312 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1313 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1314 InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1315 new MockRaftActorContext.MockPayload("2")));
1316 InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1318 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1319 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1320 actorFactory.generateActorId("collector"));
1321 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1322 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1323 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1324 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1326 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1327 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1328 actorFactory.generateActorId("collector"));
1329 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1330 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1331 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1332 final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1334 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1335 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1336 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1337 // forward the request to node2.
1339 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1340 ImmutableMap.of(node1ID, true, node2ID, true));
1341 node1RaftActorRef.tell(changeServers, testKit.getRef());
1342 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1343 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1345 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1346 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1347 votingServer(node1ID), votingServer(node2ID));
1348 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1350 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1351 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1352 votingServer(node1ID), votingServer(node2ID));
1353 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1354 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1356 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1360 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1361 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1363 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1364 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1365 configParams.setElectionTimeoutFactor(100000);
1367 final String node1ID = "node1";
1368 final String node2ID = "node2";
1370 configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1371 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1372 ? actorFactory.createTestActorPath(node2ID) : null);
1374 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1375 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1376 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1378 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1379 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1380 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1381 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1383 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1384 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1385 actorFactory.generateActorId("collector"));
1386 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1387 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1388 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1389 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1391 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1392 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1393 actorFactory.generateActorId("collector"));
1394 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1395 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1396 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1397 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1399 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1400 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1401 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1402 // request to node2 when node2 is elected.
1404 node2RaftActor.setDropMessageOfType(RequestVote.class);
1406 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1408 node1RaftActorRef.tell(changeServers, testKit.getRef());
1410 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1412 node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1414 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1415 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1417 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1418 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1419 votingServer(node1ID), votingServer(node2ID));
1420 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1421 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1423 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1424 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1425 votingServer(node1ID), votingServer(node2ID));
1426 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1428 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1431 private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1432 Stopwatch sw = Stopwatch.createStarted();
1433 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1434 for (RaftActor raftActor : raftActors) {
1435 if (raftActor.getRaftState() == expState) {
1441 fail("None of the RaftActors have state " + expState);
1444 private static ServerInfo votingServer(String id) {
1445 return new ServerInfo(id, true);
1448 private static ServerInfo nonVotingServer(String id) {
1449 return new ServerInfo(id, false);
1452 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1453 return newCollectorActor(leaderRaftActor, LEADER_ID);
1456 private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1457 TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1458 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1459 actorFactory.generateActorId(id + "Collector"));
1460 raftActor.setCollectorActor(collectorActor);
1461 return collectorActor;
1464 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1465 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1466 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1467 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1468 assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1471 private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1472 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1473 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1474 configParams.setElectionTimeoutFactor(100000);
1475 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1476 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1477 termInfo.update(1, LEADER_ID);
1478 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1479 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1480 noPersistence, applyState -> actor.tell(applyState, actor), LOG);
1483 abstract static class AbstractMockRaftActor extends MockRaftActor {
1484 private volatile TestActorRef<MessageCollectorActor> collectorActor;
1485 private volatile Class<?> dropMessageOfType;
1487 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1488 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1489 super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1490 .persistent(Optional.of(persistent)));
1491 this.collectorActor = collectorActor;
1494 void setDropMessageOfType(Class<?> dropMessageOfType) {
1495 this.dropMessageOfType = dropMessageOfType;
1498 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1499 this.collectorActor = collectorActor;
1503 public void handleCommand(Object message) {
1504 if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1505 super.handleCommand(message);
1508 if (collectorActor != null) {
1509 collectorActor.tell(message, getSender());
1514 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1516 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1517 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1518 super(id, peerAddresses, config, persistent, collectorActor);
1519 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1521 public void createSnapshot(ActorRef actorRef) {
1522 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1526 public void applySnapshot(byte[] snapshotBytes) {
1531 public static Props props(final String id, final Map<String, String> peerAddresses,
1532 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1534 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1535 persistent, collectorActor);
1540 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1541 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1542 RaftActorContext fromContext) {
1543 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1544 setPersistence(false);
1546 RaftActorContext context = getRaftActorContext();
1547 for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1548 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1549 getState().add(entry.getData());
1550 context.getReplicatedLog().append(entry);
1553 context.setCommitIndex(fromContext.getCommitIndex());
1554 context.setLastApplied(fromContext.getLastApplied());
1555 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1556 fromContext.getTermInformation().getVotedFor());
1560 protected void initializeBehavior() {
1561 changeCurrentBehavior(new Leader(getRaftActorContext()));
1562 initializeBehaviorComplete.countDown();
1566 @SuppressWarnings("checkstyle:IllegalCatch")
1567 public void createSnapshot(ActorRef actorRef) {
1569 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1570 } catch (Exception e) {
1571 LOG.error("createSnapshot failed", e);
1575 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1576 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1577 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1578 configParams.setElectionTimeoutFactor(10);
1579 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1583 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1584 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1585 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
1587 setPersistence(false);
1590 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1591 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);