2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
18 import akka.actor.AbstractActor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.io.ByteSource;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import java.io.OutputStream;
28 import java.time.Duration;
29 import java.util.List;
31 import java.util.Optional;
33 import java.util.concurrent.TimeUnit;
34 import org.apache.commons.lang3.SerializationUtils;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
44 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
45 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
46 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
47 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
48 import org.opendaylight.controller.cluster.raft.messages.AddServer;
49 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
50 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
51 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
52 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
54 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
55 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
56 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
57 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
58 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
59 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
62 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
63 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
64 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
65 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
66 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
67 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
68 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
69 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
70 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73 import scala.concurrent.duration.FiniteDuration;
76 * Unit tests for RaftActorServerConfigurationSupport.
78 * @author Thomas Pantelis
80 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
81 static final String LEADER_ID = "leader";
82 static final String FOLLOWER_ID = "follower";
83 static final String FOLLOWER_ID2 = "follower2";
84 static final String NEW_SERVER_ID = "new-server";
85 static final String NEW_SERVER_ID2 = "new-server2";
86 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
87 private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
88 private static final boolean NO_PERSISTENCE = false;
89 private static final boolean PERSISTENT = true;
91 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
93 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
94 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
95 actorFactory.generateActorId(FOLLOWER_ID));
97 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
98 private ActorRef newFollowerCollectorActor;
99 private RaftActorContext newFollowerActorContext;
101 private final TestKit testKit = new TestKit(getSystem());
104 public void setup() {
105 InMemoryJournal.clear();
106 InMemorySnapshotStore.clear();
109 @SuppressWarnings("checkstyle:IllegalCatch")
110 private void setupNewFollower() {
111 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
113 newFollowerCollectorActor = actorFactory.createActor(MessageCollectorActor.props(),
114 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
115 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
116 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
117 actorFactory.generateActorId(NEW_SERVER_ID));
120 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
121 } catch (Exception e) {
122 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
126 private static DefaultConfigParamsImpl newFollowerConfigParams() {
127 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
128 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
129 configParams.setElectionTimeoutFactor(100000);
130 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
135 public void tearDown() {
136 actorFactory.close();
140 public void testAddServerWithExistingFollower() {
141 LOG.info("testAddServerWithExistingFollower starting");
143 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
144 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
146 followerActorContext.setCommitIndex(2);
147 followerActorContext.setLastApplied(2);
149 Follower follower = new Follower(followerActorContext);
150 followerActor.underlyingActor().setBehavior(follower);
151 followerActorContext.setCurrentBehavior(follower);
153 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
154 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, followerActor.path().toString()),
155 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
156 actorFactory.generateActorId(LEADER_ID));
158 // Expect initial heartbeat from the leader.
159 expectFirstMatching(followerActor, AppendEntries.class);
160 clearMessages(followerActor);
162 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
163 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
165 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
167 // Leader should install snapshot - capture and verify ApplySnapshot contents
169 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
170 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
171 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
173 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
174 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
175 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
177 // Verify ServerConfigurationPayload entry in leader's log
179 expectFirstMatching(leaderCollectorActor, ApplyState.class);
180 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
181 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
182 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
183 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
184 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
185 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
187 // Verify ServerConfigurationPayload entry in both followers
189 expectFirstMatching(followerActor, ApplyState.class);
190 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
191 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
192 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
194 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
195 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
196 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
197 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
199 // Verify new server config was applied in both followers
201 assertEquals("Follower peers", Set.of(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
203 assertEquals("New follower peers", Set.of(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
205 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
206 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
207 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
208 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
210 assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
211 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
212 assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
213 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
215 assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
216 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
217 assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
218 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
220 LOG.info("testAddServerWithExistingFollower ending");
224 public void testAddServerWithNoExistingFollower() {
225 LOG.info("testAddServerWithNoExistingFollower starting");
228 RaftActorContext initialActorContext = new MockRaftActorContext();
229 initialActorContext.setCommitIndex(1);
230 initialActorContext.setLastApplied(1);
231 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
234 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
235 MockLeaderRaftActor.props(Map.of(), initialActorContext)
236 .withDispatcher(Dispatchers.DefaultDispatcherId()),
237 actorFactory.generateActorId(LEADER_ID));
239 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
240 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
242 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
244 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
246 // Leader should install snapshot - capture and verify ApplySnapshot contents
248 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
249 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
250 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
252 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
253 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
254 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
256 // Verify ServerConfigurationPayload entry in leader's log
258 expectFirstMatching(leaderCollectorActor, ApplyState.class);
259 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
260 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
261 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
262 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
263 votingServer(NEW_SERVER_ID));
265 // Verify ServerConfigurationPayload entry in the new follower
267 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
268 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
269 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
270 votingServer(NEW_SERVER_ID));
272 // Verify new server config was applied in the new follower
274 assertEquals("New follower peers", Set.of(LEADER_ID), newFollowerActorContext.getPeerIds());
276 LOG.info("testAddServerWithNoExistingFollower ending");
280 public void testAddServersAsNonVoting() {
281 LOG.info("testAddServersAsNonVoting starting");
284 RaftActorContext initialActorContext = new MockRaftActorContext();
286 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
287 MockLeaderRaftActor.props(Map.of(), initialActorContext)
288 .withDispatcher(Dispatchers.DefaultDispatcherId()),
289 actorFactory.generateActorId(LEADER_ID));
291 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
292 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
294 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
296 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
298 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
299 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
300 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
302 // Verify ServerConfigurationPayload entry in leader's log
304 expectFirstMatching(leaderCollectorActor, ApplyState.class);
306 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
307 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
308 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
309 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
310 nonVotingServer(NEW_SERVER_ID));
312 // Verify ServerConfigurationPayload entry in the new follower
314 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
315 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
316 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
317 nonVotingServer(NEW_SERVER_ID));
319 // Verify new server config was applied in the new follower
321 assertEquals("New follower peers", Set.of(LEADER_ID), newFollowerActorContext.getPeerIds());
323 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
325 // Add another non-voting server.
327 clearMessages(leaderCollectorActor);
329 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
330 Follower newFollower2 = new Follower(follower2ActorContext);
331 followerActor.underlyingActor().setBehavior(newFollower2);
333 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
335 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
336 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
337 assertEquals("getLeaderHint", Optional.of(LEADER_ID), addServerReply.getLeaderHint());
339 expectFirstMatching(leaderCollectorActor, ApplyState.class);
340 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
341 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
342 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
343 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
344 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
346 LOG.info("testAddServersAsNonVoting ending");
350 public void testAddServerWithOperationInProgress() {
351 LOG.info("testAddServerWithOperationInProgress starting");
354 RaftActorContext initialActorContext = new MockRaftActorContext();
356 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
357 MockLeaderRaftActor.props(Map.of(), initialActorContext)
358 .withDispatcher(Dispatchers.DefaultDispatcherId()),
359 actorFactory.generateActorId(LEADER_ID));
361 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
362 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
364 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
366 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
367 Follower newFollower2 = new Follower(follower2ActorContext);
368 followerActor.underlyingActor().setBehavior(newFollower2);
370 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
371 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
373 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
375 // Wait for leader's install snapshot and capture it
377 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
379 // Send a second AddServer - should get queued
380 TestKit testKit2 = new TestKit(getSystem());
381 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
383 // Continue the first AddServer
384 newFollowerRaftActorInstance.setDropMessageOfType(null);
385 newFollowerRaftActor.tell(installSnapshot, leaderActor);
387 // Verify both complete successfully
388 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
389 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
391 addServerReply = testKit2.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
392 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
394 // Verify ServerConfigurationPayload entries in leader's log
396 expectMatching(leaderCollectorActor, ApplyState.class, 2);
397 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
398 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
399 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
400 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
401 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
403 // Verify ServerConfigurationPayload entry in the new follower
405 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
406 assertEquals("New follower peers", Set.of(LEADER_ID, NEW_SERVER_ID2), newFollowerActorContext.getPeerIds());
408 LOG.info("testAddServerWithOperationInProgress ending");
412 public void testAddServerWithPriorSnapshotInProgress() {
413 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
416 RaftActorContext initialActorContext = new MockRaftActorContext();
418 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
419 MockLeaderRaftActor.props(Map.of(), initialActorContext)
420 .withDispatcher(Dispatchers.DefaultDispatcherId()),
421 actorFactory.generateActorId(LEADER_ID));
423 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
424 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
426 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
428 // Drop commit message for now to delay snapshot completion
429 leaderRaftActor.setDropMessageOfType(String.class);
431 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
433 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
435 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
437 leaderRaftActor.setDropMessageOfType(null);
438 leaderActor.tell(commitMsg, leaderActor);
440 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
441 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
442 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
444 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
446 // Verify ServerConfigurationPayload entry in leader's log
448 expectFirstMatching(leaderCollectorActor, ApplyState.class);
449 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
450 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
451 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
452 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
453 votingServer(NEW_SERVER_ID));
455 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
459 public void testAddServerWithPriorSnapshotCompleteTimeout() {
460 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
463 RaftActorContext initialActorContext = new MockRaftActorContext();
465 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
466 MockLeaderRaftActor.props(Map.of(), initialActorContext)
467 .withDispatcher(Dispatchers.DefaultDispatcherId()),
468 actorFactory.generateActorId(LEADER_ID));
470 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
471 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
473 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
475 // Drop commit message so the snapshot doesn't complete.
476 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
478 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
480 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
482 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
483 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
485 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
487 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
491 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() {
492 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
495 RaftActorContext initialActorContext = new MockRaftActorContext();
497 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
498 MockLeaderRaftActor.props(Map.of(), initialActorContext)
499 .withDispatcher(Dispatchers.DefaultDispatcherId()),
500 actorFactory.generateActorId(LEADER_ID));
502 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
503 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
504 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
506 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
508 // Drop the commit message so the snapshot doesn't complete yet.
509 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
511 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
513 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
515 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
517 // Change the leader behavior to follower
518 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
520 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
521 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
522 // isCapturing assertion below.
523 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
525 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
526 leaderActor.tell(commitMsg, leaderActor);
528 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
530 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
531 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
533 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
534 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
536 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
540 public void testAddServerWithLeaderChangeDuringInstallSnapshot() {
541 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
544 RaftActorContext initialActorContext = new MockRaftActorContext();
546 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
547 MockLeaderRaftActor.props(Map.of(), initialActorContext)
548 .withDispatcher(Dispatchers.DefaultDispatcherId()),
549 actorFactory.generateActorId(LEADER_ID));
551 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
552 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
554 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
556 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
558 // Drop the UnInitializedFollowerSnapshotReply to delay it.
559 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
561 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
563 final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
564 UnInitializedFollowerSnapshotReply.class);
566 // Prevent election timeout when the leader switches to follower
567 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
569 // Change the leader behavior to follower
570 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
572 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
573 leaderRaftActor.setDropMessageOfType(null);
574 leaderActor.tell(snapshotReply, leaderActor);
576 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
577 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
579 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
581 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
585 public void testAddServerWithInstallSnapshotTimeout() {
586 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
589 RaftActorContext initialActorContext = new MockRaftActorContext();
591 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
592 MockLeaderRaftActor.props(Map.of(), initialActorContext)
593 .withDispatcher(Dispatchers.DefaultDispatcherId()),
594 actorFactory.generateActorId(LEADER_ID));
596 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
597 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
598 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
600 // Drop the InstallSnapshot message so it times out
601 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
603 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
605 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
607 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
608 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
610 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
611 assertEquals("Leader followers size", 0,
612 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
614 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
618 public void testAddServerWithNoLeader() {
619 LOG.info("testAddServerWithNoLeader starting");
622 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
623 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
625 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
626 MockRaftActor.builder().id(LEADER_ID).peerAddresses(Map.of(FOLLOWER_ID,
627 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
628 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
629 actorFactory.generateActorId(LEADER_ID));
630 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
632 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
634 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
635 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
637 LOG.info("testAddServerWithNoLeader ending");
641 public void testAddServerWithNoConsensusReached() {
642 LOG.info("testAddServerWithNoConsensusReached starting");
645 RaftActorContext initialActorContext = new MockRaftActorContext();
647 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
648 MockLeaderRaftActor.props(Map.of(), initialActorContext)
649 .withDispatcher(Dispatchers.DefaultDispatcherId()),
650 actorFactory.generateActorId(LEADER_ID));
652 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
653 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
655 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
657 // Drop UnInitializedFollowerSnapshotReply initially
658 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
660 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
661 newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
663 // Drop AppendEntries to the new follower so consensus isn't reached
664 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
666 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
668 // Capture the UnInitializedFollowerSnapshotReply
669 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
671 // Send the UnInitializedFollowerSnapshotReply to resume the first request
672 leaderRaftActor.setDropMessageOfType(null);
673 leaderActor.tell(snapshotReply, leaderActor);
675 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
677 // Send a second AddServer
678 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
680 // The first AddServer should succeed with OK even though consensus wasn't reached
681 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
682 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
683 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
685 // Verify ServerConfigurationPayload entry in leader's log
686 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
687 votingServer(NEW_SERVER_ID));
689 // The second AddServer should fail since consensus wasn't reached for the first
690 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
691 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
693 // Re-send the second AddServer - should also fail
694 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
695 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
696 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
698 LOG.info("testAddServerWithNoConsensusReached ending");
702 public void testAddServerWithExistingServer() {
703 LOG.info("testAddServerWithExistingServer starting");
705 RaftActorContext initialActorContext = new MockRaftActorContext();
707 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
708 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, followerActor.path().toString()),
709 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
710 actorFactory.generateActorId(LEADER_ID));
712 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
714 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
715 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
717 LOG.info("testAddServerWithExistingServer ending");
721 public void testAddServerForwardedToLeader() {
722 LOG.info("testAddServerForwardedToLeader starting");
725 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
726 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
728 ActorRef leaderActor = actorFactory.createActor(
729 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
731 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
732 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(Map.of(LEADER_ID,
733 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
734 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
735 actorFactory.generateActorId(FOLLOWER_ID));
736 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
738 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, List.of(), -1, -1, (short)0), leaderActor);
740 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
742 expectFirstMatching(leaderActor, AddServer.class);
744 LOG.info("testAddServerForwardedToLeader ending");
748 public void testOnApplyState() {
749 LOG.info("testOnApplyState starting");
751 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
752 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
753 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
754 MockRaftActor.builder().id(LEADER_ID).peerAddresses(Map.of(FOLLOWER_ID,
755 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
756 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
757 actorFactory.generateActorId(LEADER_ID));
759 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
760 noLeaderActor.underlyingActor());
762 ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
763 new ServerConfigurationPayload(List.of()));
764 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
765 assertEquals("Message handled", true, handled);
767 ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
768 new MockRaftActorContext.MockPayload("1"));
769 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
770 assertEquals("Message handled", false, handled);
772 LOG.info("testOnApplyState ending");
776 public void testRemoveServerWithNoLeader() {
777 LOG.info("testRemoveServerWithNoLeader starting");
779 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
780 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
782 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
783 MockRaftActor.builder().id(LEADER_ID).peerAddresses(Map.of(FOLLOWER_ID,
784 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
785 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
786 actorFactory.generateActorId(LEADER_ID));
787 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
789 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
790 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
791 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
793 LOG.info("testRemoveServerWithNoLeader ending");
797 public void testRemoveServerNonExistentServer() {
798 LOG.info("testRemoveServerNonExistentServer starting");
800 RaftActorContext initialActorContext = new MockRaftActorContext();
802 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
803 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, followerActor.path().toString()),
804 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
805 actorFactory.generateActorId(LEADER_ID));
807 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
808 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
809 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
811 LOG.info("testRemoveServerNonExistentServer ending");
815 public void testRemoveServerForwardToLeader() {
816 LOG.info("testRemoveServerForwardToLeader starting");
818 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
819 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
821 ActorRef leaderActor = actorFactory.createTestActor(
822 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
824 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
825 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(Map.of(LEADER_ID,
826 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
827 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
828 actorFactory.generateActorId(FOLLOWER_ID));
829 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
831 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, List.of(), -1, -1, (short)0), leaderActor);
833 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
834 expectFirstMatching(leaderActor, RemoveServer.class);
836 LOG.info("testRemoveServerForwardToLeader ending");
840 public void testRemoveServer() {
841 LOG.info("testRemoveServer starting");
843 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
844 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
845 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
847 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
848 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
849 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
850 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
851 RaftActorContext initialActorContext = new MockRaftActorContext();
853 final String downNodeId = "downNode";
854 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
855 Map.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
856 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
857 actorFactory.generateActorId(LEADER_ID));
859 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
861 ActorRef follower1Collector = actorFactory.createActor(
862 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
863 final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
864 CollectingMockRaftActor.props(FOLLOWER_ID, Map.of(LEADER_ID, leaderActor.path().toString(),
865 FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
866 follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
868 ActorRef follower2Collector = actorFactory.createActor(
869 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
870 final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
871 CollectingMockRaftActor.props(FOLLOWER_ID2, Map.of(LEADER_ID, leaderActor.path().toString(),
872 FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
873 follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
875 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
876 follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
877 follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
879 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
880 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
881 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
883 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
884 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
885 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
886 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
888 applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
889 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
890 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
891 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
893 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
894 assertTrue("Expected Leader", currentBehavior instanceof Leader);
895 assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
897 MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
899 LOG.info("testRemoveServer ending");
903 public void testRemoveServerLeader() {
904 LOG.info("testRemoveServerLeader starting");
906 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
907 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
908 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
910 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
911 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
912 RaftActorContext initialActorContext = new MockRaftActorContext();
914 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
915 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, followerActorPath),
916 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
917 actorFactory.generateActorId(LEADER_ID));
919 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
921 final ActorRef followerCollector =
922 actorFactory.createActor(MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
923 actorFactory.createTestActor(
924 CollectingMockRaftActor.props(FOLLOWER_ID, Map.of(LEADER_ID, leaderActor.path().toString()),
925 configParams, NO_PERSISTENCE, followerCollector)
926 .withDispatcher(Dispatchers.DefaultDispatcherId()),
929 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
930 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
931 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
933 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
934 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
935 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
936 votingServer(FOLLOWER_ID));
938 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
940 LOG.info("testRemoveServerLeader ending");
944 public void testRemoveServerLeaderWithNoFollowers() {
945 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
947 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
948 MockLeaderRaftActor.props(Map.of(),
949 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
950 actorFactory.generateActorId(LEADER_ID));
952 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
953 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
954 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
956 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
960 public void testChangeServersVotingStatus() {
961 LOG.info("testChangeServersVotingStatus starting");
963 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
964 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
965 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
967 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
968 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
969 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
970 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
972 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
973 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, follower1ActorPath,
974 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
975 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
976 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
978 ActorRef follower1Collector = actorFactory.createActor(
979 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
980 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
981 CollectingMockRaftActor.props(FOLLOWER_ID, Map.of(LEADER_ID, leaderActor.path().toString(),
982 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
983 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
985 ActorRef follower2Collector = actorFactory.createActor(
986 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
987 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
988 CollectingMockRaftActor.props(FOLLOWER_ID2, Map.of(LEADER_ID, leaderActor.path().toString(),
989 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
990 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
992 // Send first ChangeServersVotingStatus message
994 leaderActor.tell(new ChangeServersVotingStatus(Map.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
996 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
997 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
999 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1000 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1001 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1002 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1004 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1005 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1006 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1007 nonVotingServer(FOLLOWER_ID2));
1009 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1010 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1011 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1012 nonVotingServer(FOLLOWER_ID2));
1014 MessageCollectorActor.clearMessages(leaderCollector);
1015 MessageCollectorActor.clearMessages(follower1Collector);
1016 MessageCollectorActor.clearMessages(follower2Collector);
1018 // Send second ChangeServersVotingStatus message
1020 leaderActor.tell(new ChangeServersVotingStatus(Map.of(FOLLOWER_ID, true)), testKit.getRef());
1021 reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1022 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1024 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1025 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1026 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1028 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1029 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1030 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1032 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1033 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1034 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1036 LOG.info("testChangeServersVotingStatus ending");
1040 public void testChangeLeaderToNonVoting() {
1041 LOG.info("testChangeLeaderToNonVoting starting");
1043 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1044 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1046 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1047 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1048 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1049 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1051 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1052 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, follower1ActorPath,
1053 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1054 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1055 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1057 ActorRef follower1Collector = actorFactory.createActor(
1058 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1059 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1060 CollectingMockRaftActor.props(FOLLOWER_ID, Map.of(LEADER_ID, leaderActor.path().toString(),
1061 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1062 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1064 ActorRef follower2Collector = actorFactory.createActor(
1065 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1066 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1067 CollectingMockRaftActor.props(FOLLOWER_ID2, Map.of(LEADER_ID, leaderActor.path().toString(),
1068 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1069 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1071 // Send ChangeServersVotingStatus message
1073 leaderActor.tell(new ChangeServersVotingStatus(Map.of(LEADER_ID, false)), testKit.getRef());
1074 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1075 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1077 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1078 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1079 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1081 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1082 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1083 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1085 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1086 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1087 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1089 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1090 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1092 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1094 LOG.info("testChangeLeaderToNonVoting ending");
1098 public void testChangeLeaderToNonVotingInSingleNode() {
1099 LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1101 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1102 MockLeaderRaftActor.props(Map.of(), new MockRaftActorContext())
1103 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1105 leaderActor.tell(new ChangeServersVotingStatus(Map.of(LEADER_ID, false)), testKit.getRef());
1106 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1107 assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1109 LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1113 public void testChangeToVotingWithNoLeader() {
1114 LOG.info("testChangeToVotingWithNoLeader starting");
1116 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1117 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1118 configParams.setElectionTimeoutFactor(5);
1120 final String node1ID = "node1";
1121 final String node2ID = "node2";
1123 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1124 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1127 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
1128 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1129 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1130 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1132 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1133 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1134 InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1135 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1136 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1137 InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1139 ActorRef node1Collector = actorFactory.createActor(
1140 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1141 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1142 CollectingMockRaftActor.props(node1ID, Map.of(), configParams,
1143 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1144 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1146 ActorRef node2Collector = actorFactory.createActor(
1147 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1148 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1149 CollectingMockRaftActor.props(node2ID, Map.of(), configParams,
1150 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1151 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1153 node1RaftActor.waitForInitializeBehaviorComplete();
1154 node2RaftActor.waitForInitializeBehaviorComplete();
1156 // Verify the intended server config was loaded and applied.
1157 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1158 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1159 votingServer("downNode2"));
1160 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1161 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1162 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1164 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1165 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1166 votingServer("downNode2"));
1167 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1169 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1170 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1171 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1172 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1174 // First send the message such that node1 has no peer address for node2 - should fail.
1176 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(Map.of(node1ID, true,
1177 node2ID, true, "downNode1", false, "downNode2", false));
1178 node1RaftActorRef.tell(changeServers, testKit.getRef());
1179 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1180 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1181 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1183 // Send an AppendEntries so node1 has a leaderId
1185 long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1186 node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1187 List.of(), 0, -1, (short)1), ActorRef.noSender());
1189 // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
1190 // ChangeServersVotingStatus message, it will try to elect a leader.
1192 AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
1193 rs -> assertEquals("getLeader", null, rs.getLeader()));
1195 // Update node2's peer address and send the message again
1197 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1199 node1RaftActorRef.tell(changeServers, testKit.getRef());
1200 reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1201 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1203 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1204 ApplyJournalEntries.class);
1205 assertEquals("getToIndex", 1, apply.getToIndex());
1206 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1207 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1208 nonVotingServer("downNode2"));
1209 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1210 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1212 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1213 assertEquals("getToIndex", 1, apply.getToIndex());
1214 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1215 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1216 nonVotingServer("downNode2"));
1217 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1218 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1220 LOG.info("testChangeToVotingWithNoLeader ending");
1224 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1225 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1227 final String node1ID = "node1";
1228 final String node2ID = "node2";
1230 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1231 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1232 ? actorFactory.createTestActorPath(node2ID) : null;
1234 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
1235 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1236 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1238 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1239 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1240 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1241 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1243 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1244 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1245 configParams1.setElectionTimeoutFactor(1);
1246 configParams1.setPeerAddressResolver(peerAddressResolver);
1247 ActorRef node1Collector = actorFactory.createActor(
1248 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1249 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1250 CollectingMockRaftActor.props(node1ID, Map.of(), configParams1,
1251 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1252 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1254 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1255 configParams2.setElectionTimeoutFactor(1000000);
1256 configParams2.setPeerAddressResolver(peerAddressResolver);
1257 ActorRef node2Collector = actorFactory.createActor(
1258 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1259 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1260 CollectingMockRaftActor.props(node2ID, Map.of(), configParams2,
1261 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1262 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1264 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1265 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1266 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1267 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1268 // node2 was previously voting.
1270 node2RaftActor.setDropMessageOfType(RequestVote.class);
1272 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(Map.of(node1ID, true));
1273 node1RaftActorRef.tell(changeServers, testKit.getRef());
1274 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1275 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1277 assertEquals("Server config", Set.of(nonVotingServer(node1ID), votingServer(node2ID)),
1278 Set.copyOf(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1279 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1281 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1285 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1286 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1288 final String node1ID = "node1";
1289 final String node2ID = "node2";
1291 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1292 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1293 ? actorFactory.createTestActorPath(node2ID) : null;
1295 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1296 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1297 configParams.setElectionTimeoutFactor(3);
1298 configParams.setPeerAddressResolver(peerAddressResolver);
1300 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
1301 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1302 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1304 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1305 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1306 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1307 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1308 InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1309 new MockRaftActorContext.MockPayload("2")));
1310 InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1312 ActorRef node1Collector = actorFactory.createActor(
1313 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1314 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1315 CollectingMockRaftActor.props(node1ID, Map.of(), configParams,
1316 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1317 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1319 ActorRef node2Collector = actorFactory.createActor(
1320 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1321 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1322 CollectingMockRaftActor.props(node2ID, Map.of(), configParams,
1323 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1324 final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1326 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1327 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1328 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1329 // forward the request to node2.
1331 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1332 Map.of(node1ID, true, node2ID, true));
1333 node1RaftActorRef.tell(changeServers, testKit.getRef());
1334 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1335 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1337 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1338 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1339 votingServer(node1ID), votingServer(node2ID));
1340 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1342 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1343 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1344 votingServer(node1ID), votingServer(node2ID));
1345 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1346 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1348 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1352 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1353 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1355 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1356 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1357 configParams.setElectionTimeoutFactor(100000);
1359 final String node1ID = "node1";
1360 final String node2ID = "node2";
1362 configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1363 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1364 ? actorFactory.createTestActorPath(node2ID) : null);
1366 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
1367 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1368 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1370 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1371 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1372 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1373 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1375 ActorRef node1Collector = actorFactory.createActor(
1376 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1377 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1378 CollectingMockRaftActor.props(node1ID, Map.of(), configParams,
1379 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1380 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1382 ActorRef node2Collector = actorFactory.createActor(
1383 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1384 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1385 CollectingMockRaftActor.props(node2ID, Map.of(), configParams,
1386 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1387 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1389 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1390 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1391 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1392 // request to node2 when node2 is elected.
1394 node2RaftActor.setDropMessageOfType(RequestVote.class);
1396 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(Map.of(node1ID, true,
1398 node1RaftActorRef.tell(changeServers, testKit.getRef());
1400 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1402 node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1404 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1405 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1407 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1408 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1409 votingServer(node1ID), votingServer(node2ID));
1410 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1411 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1413 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1414 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1415 votingServer(node1ID), votingServer(node2ID));
1416 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1418 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1421 private static void verifyRaftState(final RaftState expState, final RaftActor... raftActors) {
1422 Stopwatch sw = Stopwatch.createStarted();
1423 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1424 for (RaftActor raftActor : raftActors) {
1425 if (raftActor.getRaftState() == expState) {
1431 fail("None of the RaftActors have state " + expState);
1434 private static ServerInfo votingServer(final String id) {
1435 return new ServerInfo(id, true);
1438 private static ServerInfo nonVotingServer(final String id) {
1439 return new ServerInfo(id, false);
1442 private ActorRef newLeaderCollectorActor(final MockLeaderRaftActor leaderRaftActor) {
1443 return newCollectorActor(leaderRaftActor, LEADER_ID);
1446 private ActorRef newCollectorActor(final AbstractMockRaftActor raftActor, final String id) {
1447 ActorRef collectorActor = actorFactory.createTestActor(
1448 MessageCollectorActor.props(), actorFactory.generateActorId(id + "Collector"));
1449 raftActor.setCollectorActor(collectorActor);
1450 return collectorActor;
1453 private static void verifyServerConfigurationPayloadEntry(final ReplicatedLog log, final ServerInfo... expected) {
1454 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1455 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1456 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1457 assertEquals("Server config", Set.of(expected), Set.copyOf(payload.getServerConfig()));
1460 private static RaftActorContextImpl newFollowerContext(final String id,
1461 final TestActorRef<? extends AbstractActor> actor) {
1462 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1463 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1464 configParams.setElectionTimeoutFactor(100000);
1465 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
1466 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1467 termInfo.update(1, LEADER_ID);
1468 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1469 id, termInfo, -1, -1, Map.of(LEADER_ID, ""), configParams,
1470 noPersistence, applyState -> actor.tell(applyState, actor), LOG, MoreExecutors.directExecutor());
1473 abstract static class AbstractMockRaftActor extends MockRaftActor {
1474 private volatile ActorRef collectorActor;
1475 private volatile Class<?> dropMessageOfType;
1477 AbstractMockRaftActor(final String id, final Map<String, String> peerAddresses,
1478 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1479 super(builder().id(id).peerAddresses(peerAddresses).config(config.orElseThrow())
1480 .persistent(Optional.of(persistent)));
1481 this.collectorActor = collectorActor;
1484 void setDropMessageOfType(final Class<?> dropMessageOfType) {
1485 this.dropMessageOfType = dropMessageOfType;
1488 void setCollectorActor(final ActorRef collectorActor) {
1489 this.collectorActor = collectorActor;
1493 public void handleCommand(final Object message) {
1494 if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1495 super.handleCommand(message);
1498 if (collectorActor != null) {
1499 collectorActor.tell(message, getSender());
1504 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1506 CollectingMockRaftActor(final String id, final Map<String, String> peerAddresses,
1507 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1508 super(id, peerAddresses, config, persistent, collectorActor);
1509 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1511 public void createSnapshot(final ActorRef actorRef,
1512 final Optional<OutputStream> installSnapshotStream) {
1513 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1517 public void applySnapshot(
1518 final org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1522 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1523 final ByteSource snapshotBytes) {
1524 throw new UnsupportedOperationException();
1529 public static Props props(final String id, final Map<String, String> peerAddresses,
1530 final ConfigParams config, final boolean persistent, final ActorRef collectorActor) {
1532 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1533 persistent, collectorActor);
1538 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1539 public MockLeaderRaftActor(final Map<String, String> peerAddresses, final ConfigParams config,
1540 final RaftActorContext fromContext) {
1541 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1542 setPersistence(false);
1544 RaftActorContext context = getRaftActorContext();
1545 for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1546 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1547 getState().add(entry.getData());
1548 context.getReplicatedLog().append(entry);
1551 context.setCommitIndex(fromContext.getCommitIndex());
1552 context.setLastApplied(fromContext.getLastApplied());
1553 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1554 fromContext.getTermInformation().getVotedFor());
1558 protected void initializeBehavior() {
1559 changeCurrentBehavior(new Leader(getRaftActorContext()));
1560 initializeBehaviorComplete.countDown();
1564 @SuppressWarnings("checkstyle:IllegalCatch")
1565 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1566 MockSnapshotState snapshotState = new MockSnapshotState(List.copyOf(getState()));
1567 if (installSnapshotStream.isPresent()) {
1568 SerializationUtils.serialize(snapshotState, installSnapshotStream.orElseThrow());
1571 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1574 static Props props(final Map<String, String> peerAddresses, final RaftActorContext fromContext) {
1575 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1576 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1577 configParams.setElectionTimeoutFactor(10);
1578 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1582 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1583 public MockNewFollowerRaftActor(final ConfigParams config, final ActorRef collectorActor) {
1584 super(NEW_SERVER_ID, Map.of(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1585 setPersistence(false);
1588 static Props props(final ConfigParams config, final ActorRef collectorActor) {
1589 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);