2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
18 import akka.actor.AbstractActor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.ImmutableSet;
27 import com.google.common.io.ByteSource;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import java.io.OutputStream;
30 import java.time.Duration;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.List;
38 import java.util.Optional;
39 import java.util.concurrent.TimeUnit;
40 import org.apache.commons.lang3.SerializationUtils;
41 import org.junit.After;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
48 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
50 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
51 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
52 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
53 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
54 import org.opendaylight.controller.cluster.raft.messages.AddServer;
55 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
60 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
62 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
63 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
64 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
65 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
67 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
69 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
70 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
71 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
72 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
73 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
74 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
75 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79 import scala.concurrent.duration.FiniteDuration;
82 * Unit tests for RaftActorServerConfigurationSupport.
84 * @author Thomas Pantelis
86 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
87 static final String LEADER_ID = "leader";
88 static final String FOLLOWER_ID = "follower";
89 static final String FOLLOWER_ID2 = "follower2";
90 static final String NEW_SERVER_ID = "new-server";
91 static final String NEW_SERVER_ID2 = "new-server2";
92 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
93 private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
94 private static final boolean NO_PERSISTENCE = false;
95 private static final boolean PERSISTENT = true;
97 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
99 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
100 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
101 actorFactory.generateActorId(FOLLOWER_ID));
103 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
104 private ActorRef newFollowerCollectorActor;
105 private RaftActorContext newFollowerActorContext;
107 private final TestKit testKit = new TestKit(getSystem());
110 public void setup() {
111 InMemoryJournal.clear();
112 InMemorySnapshotStore.clear();
115 @SuppressWarnings("checkstyle:IllegalCatch")
116 private void setupNewFollower() {
117 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
119 newFollowerCollectorActor = actorFactory.createActor(MessageCollectorActor.props(),
120 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
121 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
122 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
123 actorFactory.generateActorId(NEW_SERVER_ID));
126 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
127 } catch (Exception e) {
128 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
132 private static DefaultConfigParamsImpl newFollowerConfigParams() {
133 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
134 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
135 configParams.setElectionTimeoutFactor(100000);
136 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
141 public void tearDown() {
142 actorFactory.close();
146 public void testAddServerWithExistingFollower() {
147 LOG.info("testAddServerWithExistingFollower starting");
149 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
150 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
152 followerActorContext.setCommitIndex(2);
153 followerActorContext.setLastApplied(2);
155 Follower follower = new Follower(followerActorContext);
156 followerActor.underlyingActor().setBehavior(follower);
157 followerActorContext.setCurrentBehavior(follower);
159 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
160 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
161 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
162 actorFactory.generateActorId(LEADER_ID));
164 // Expect initial heartbeat from the leader.
165 expectFirstMatching(followerActor, AppendEntries.class);
166 clearMessages(followerActor);
168 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
169 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
171 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
173 // Leader should install snapshot - capture and verify ApplySnapshot contents
175 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
176 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
177 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
179 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
180 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
181 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
183 // Verify ServerConfigurationPayload entry in leader's log
185 expectFirstMatching(leaderCollectorActor, ApplyState.class);
186 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
187 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
188 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
189 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
190 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
191 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
193 // Verify ServerConfigurationPayload entry in both followers
195 expectFirstMatching(followerActor, ApplyState.class);
196 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
197 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
198 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
200 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
201 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
202 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
203 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
205 // Verify new server config was applied in both followers
207 assertEquals("Follower peers", ImmutableSet.of(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
209 assertEquals("New follower peers", ImmutableSet.of(LEADER_ID, FOLLOWER_ID),
210 newFollowerActorContext.getPeerIds());
212 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
213 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
214 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
215 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
217 assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
218 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
219 assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
220 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
222 assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
223 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
224 assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
225 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
227 LOG.info("testAddServerWithExistingFollower ending");
231 public void testAddServerWithNoExistingFollower() {
232 LOG.info("testAddServerWithNoExistingFollower starting");
235 RaftActorContext initialActorContext = new MockRaftActorContext();
236 initialActorContext.setCommitIndex(1);
237 initialActorContext.setLastApplied(1);
238 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
241 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
242 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
243 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
244 actorFactory.generateActorId(LEADER_ID));
246 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
247 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
249 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
251 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
253 // Leader should install snapshot - capture and verify ApplySnapshot contents
255 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
256 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
257 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
259 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
260 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
261 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
263 // Verify ServerConfigurationPayload entry in leader's log
265 expectFirstMatching(leaderCollectorActor, ApplyState.class);
266 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
267 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
268 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
269 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
270 votingServer(NEW_SERVER_ID));
272 // Verify ServerConfigurationPayload entry in the new follower
274 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
275 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
276 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
277 votingServer(NEW_SERVER_ID));
279 // Verify new server config was applied in the new follower
281 assertEquals("New follower peers", ImmutableSet.of(LEADER_ID), newFollowerActorContext.getPeerIds());
283 LOG.info("testAddServerWithNoExistingFollower ending");
287 public void testAddServersAsNonVoting() {
288 LOG.info("testAddServersAsNonVoting starting");
291 RaftActorContext initialActorContext = new MockRaftActorContext();
293 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
294 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
295 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
296 actorFactory.generateActorId(LEADER_ID));
298 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
299 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
301 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
303 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
305 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
306 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
307 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
309 // Verify ServerConfigurationPayload entry in leader's log
311 expectFirstMatching(leaderCollectorActor, ApplyState.class);
313 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
314 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
315 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
316 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
317 nonVotingServer(NEW_SERVER_ID));
319 // Verify ServerConfigurationPayload entry in the new follower
321 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
322 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
323 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
324 nonVotingServer(NEW_SERVER_ID));
326 // Verify new server config was applied in the new follower
328 assertEquals("New follower peers", ImmutableSet.of(LEADER_ID), newFollowerActorContext.getPeerIds());
330 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
332 // Add another non-voting server.
334 clearMessages(leaderCollectorActor);
336 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
337 Follower newFollower2 = new Follower(follower2ActorContext);
338 followerActor.underlyingActor().setBehavior(newFollower2);
340 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
342 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
343 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
344 assertEquals("getLeaderHint", Optional.of(LEADER_ID), addServerReply.getLeaderHint());
346 expectFirstMatching(leaderCollectorActor, ApplyState.class);
347 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
348 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
349 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
350 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
351 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
353 LOG.info("testAddServersAsNonVoting ending");
357 public void testAddServerWithOperationInProgress() {
358 LOG.info("testAddServerWithOperationInProgress starting");
361 RaftActorContext initialActorContext = new MockRaftActorContext();
363 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
364 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
365 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
366 actorFactory.generateActorId(LEADER_ID));
368 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
369 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
371 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
373 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
374 Follower newFollower2 = new Follower(follower2ActorContext);
375 followerActor.underlyingActor().setBehavior(newFollower2);
377 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
378 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
380 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
382 // Wait for leader's install snapshot and capture it
384 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
386 // Send a second AddServer - should get queued
387 TestKit testKit2 = new TestKit(getSystem());
388 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
390 // Continue the first AddServer
391 newFollowerRaftActorInstance.setDropMessageOfType(null);
392 newFollowerRaftActor.tell(installSnapshot, leaderActor);
394 // Verify both complete successfully
395 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
396 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
398 addServerReply = testKit2.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
399 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
401 // Verify ServerConfigurationPayload entries in leader's log
403 expectMatching(leaderCollectorActor, ApplyState.class, 2);
404 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
405 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
406 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
407 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
408 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
410 // Verify ServerConfigurationPayload entry in the new follower
412 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
413 assertEquals("New follower peers", ImmutableSet.of(LEADER_ID, NEW_SERVER_ID2),
414 newFollowerActorContext.getPeerIds());
416 LOG.info("testAddServerWithOperationInProgress ending");
420 public void testAddServerWithPriorSnapshotInProgress() {
421 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
424 RaftActorContext initialActorContext = new MockRaftActorContext();
426 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
427 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
428 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
429 actorFactory.generateActorId(LEADER_ID));
431 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
432 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
434 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
436 // Drop commit message for now to delay snapshot completion
437 leaderRaftActor.setDropMessageOfType(String.class);
439 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
441 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
443 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
445 leaderRaftActor.setDropMessageOfType(null);
446 leaderActor.tell(commitMsg, leaderActor);
448 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
449 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
450 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
452 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
454 // Verify ServerConfigurationPayload entry in leader's log
456 expectFirstMatching(leaderCollectorActor, ApplyState.class);
457 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
458 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
459 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
460 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
461 votingServer(NEW_SERVER_ID));
463 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
467 public void testAddServerWithPriorSnapshotCompleteTimeout() {
468 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
471 RaftActorContext initialActorContext = new MockRaftActorContext();
473 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
474 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
475 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
476 actorFactory.generateActorId(LEADER_ID));
478 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
479 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
481 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
483 // Drop commit message so the snapshot doesn't complete.
484 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
486 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
488 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
490 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
491 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
493 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
495 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
499 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() {
500 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
503 RaftActorContext initialActorContext = new MockRaftActorContext();
505 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
506 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
507 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
508 actorFactory.generateActorId(LEADER_ID));
510 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
511 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
512 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
514 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
516 // Drop the commit message so the snapshot doesn't complete yet.
517 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
519 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
521 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
523 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
525 // Change the leader behavior to follower
526 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
528 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
529 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
530 // isCapturing assertion below.
531 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
533 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
534 leaderActor.tell(commitMsg, leaderActor);
536 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
538 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
539 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
541 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
542 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
544 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
548 public void testAddServerWithLeaderChangeDuringInstallSnapshot() {
549 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
552 RaftActorContext initialActorContext = new MockRaftActorContext();
554 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
555 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
556 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
557 actorFactory.generateActorId(LEADER_ID));
559 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
560 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
562 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
564 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
566 // Drop the UnInitializedFollowerSnapshotReply to delay it.
567 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
569 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
571 final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
572 UnInitializedFollowerSnapshotReply.class);
574 // Prevent election timeout when the leader switches to follower
575 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
577 // Change the leader behavior to follower
578 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
580 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
581 leaderRaftActor.setDropMessageOfType(null);
582 leaderActor.tell(snapshotReply, leaderActor);
584 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
585 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
587 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
589 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
593 public void testAddServerWithInstallSnapshotTimeout() {
594 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
597 RaftActorContext initialActorContext = new MockRaftActorContext();
599 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
600 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
601 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
602 actorFactory.generateActorId(LEADER_ID));
604 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
605 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
606 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
608 // Drop the InstallSnapshot message so it times out
609 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
611 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
613 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
615 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
616 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
618 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
619 assertEquals("Leader followers size", 0,
620 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
622 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
626 public void testAddServerWithNoLeader() {
627 LOG.info("testAddServerWithNoLeader starting");
630 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
631 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
633 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
634 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
635 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
636 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
637 actorFactory.generateActorId(LEADER_ID));
638 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
640 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
642 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
643 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
645 LOG.info("testAddServerWithNoLeader ending");
649 public void testAddServerWithNoConsensusReached() {
650 LOG.info("testAddServerWithNoConsensusReached starting");
653 RaftActorContext initialActorContext = new MockRaftActorContext();
655 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
656 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
657 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
658 actorFactory.generateActorId(LEADER_ID));
660 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
661 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
663 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
665 // Drop UnInitializedFollowerSnapshotReply initially
666 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
668 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
669 newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
671 // Drop AppendEntries to the new follower so consensus isn't reached
672 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
674 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
676 // Capture the UnInitializedFollowerSnapshotReply
677 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
679 // Send the UnInitializedFollowerSnapshotReply to resume the first request
680 leaderRaftActor.setDropMessageOfType(null);
681 leaderActor.tell(snapshotReply, leaderActor);
683 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
685 // Send a second AddServer
686 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
688 // The first AddServer should succeed with OK even though consensus wasn't reached
689 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
690 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
691 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
693 // Verify ServerConfigurationPayload entry in leader's log
694 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
695 votingServer(NEW_SERVER_ID));
697 // The second AddServer should fail since consensus wasn't reached for the first
698 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
699 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
701 // Re-send the second AddServer - should also fail
702 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
703 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
704 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
706 LOG.info("testAddServerWithNoConsensusReached ending");
710 public void testAddServerWithExistingServer() {
711 LOG.info("testAddServerWithExistingServer starting");
713 RaftActorContext initialActorContext = new MockRaftActorContext();
715 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
716 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
717 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
718 actorFactory.generateActorId(LEADER_ID));
720 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
722 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
723 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
725 LOG.info("testAddServerWithExistingServer ending");
729 public void testAddServerForwardedToLeader() {
730 LOG.info("testAddServerForwardedToLeader starting");
733 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
734 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
736 ActorRef leaderActor = actorFactory.createActor(
737 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
739 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
740 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
741 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
742 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
743 actorFactory.generateActorId(FOLLOWER_ID));
744 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
746 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
747 -1, -1, (short)0), leaderActor);
749 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
751 expectFirstMatching(leaderActor, AddServer.class);
753 LOG.info("testAddServerForwardedToLeader ending");
757 public void testOnApplyState() {
758 LOG.info("testOnApplyState starting");
760 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
761 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
762 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
763 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
764 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
765 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
766 actorFactory.generateActorId(LEADER_ID));
768 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
769 noLeaderActor.underlyingActor());
771 ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
772 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
773 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
774 assertEquals("Message handled", true, handled);
776 ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
777 new MockRaftActorContext.MockPayload("1"));
778 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
779 assertEquals("Message handled", false, handled);
781 LOG.info("testOnApplyState ending");
785 public void testRemoveServerWithNoLeader() {
786 LOG.info("testRemoveServerWithNoLeader starting");
788 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
789 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
791 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
792 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
793 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
794 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
795 actorFactory.generateActorId(LEADER_ID));
796 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
798 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
799 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
800 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
802 LOG.info("testRemoveServerWithNoLeader ending");
806 public void testRemoveServerNonExistentServer() {
807 LOG.info("testRemoveServerNonExistentServer starting");
809 RaftActorContext initialActorContext = new MockRaftActorContext();
811 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
812 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
813 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
814 actorFactory.generateActorId(LEADER_ID));
816 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
817 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
818 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
820 LOG.info("testRemoveServerNonExistentServer ending");
824 public void testRemoveServerForwardToLeader() {
825 LOG.info("testRemoveServerForwardToLeader starting");
827 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
828 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
830 ActorRef leaderActor = actorFactory.createTestActor(
831 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
833 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
834 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
835 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
836 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
837 actorFactory.generateActorId(FOLLOWER_ID));
838 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
840 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
841 -1, -1, (short)0), leaderActor);
843 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
844 expectFirstMatching(leaderActor, RemoveServer.class);
846 LOG.info("testRemoveServerForwardToLeader ending");
850 public void testRemoveServer() {
851 LOG.info("testRemoveServer starting");
853 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
854 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
855 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
857 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
858 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
859 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
860 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
861 RaftActorContext initialActorContext = new MockRaftActorContext();
863 final String downNodeId = "downNode";
864 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
865 ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
866 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
867 actorFactory.generateActorId(LEADER_ID));
869 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
871 ActorRef follower1Collector = actorFactory.createActor(
872 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
873 final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
874 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
875 FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
876 follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
878 ActorRef follower2Collector = actorFactory.createActor(
879 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
880 final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
881 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
882 FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
883 follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
885 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
886 follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
887 follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
889 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
890 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
891 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
893 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
894 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
895 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
896 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
898 applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
899 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
900 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
901 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
903 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
904 assertTrue("Expected Leader", currentBehavior instanceof Leader);
905 assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
907 MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
909 LOG.info("testRemoveServer ending");
913 public void testRemoveServerLeader() {
914 LOG.info("testRemoveServerLeader starting");
916 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
917 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
918 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
920 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
921 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
922 RaftActorContext initialActorContext = new MockRaftActorContext();
924 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
925 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
926 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
927 actorFactory.generateActorId(LEADER_ID));
929 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
931 final ActorRef followerCollector =
932 actorFactory.createActor(MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
933 actorFactory.createTestActor(
934 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
935 configParams, NO_PERSISTENCE, followerCollector)
936 .withDispatcher(Dispatchers.DefaultDispatcherId()),
939 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
940 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
941 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
943 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
944 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
945 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
946 votingServer(FOLLOWER_ID));
948 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
950 LOG.info("testRemoveServerLeader ending");
954 public void testRemoveServerLeaderWithNoFollowers() {
955 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
957 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
958 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
959 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
960 actorFactory.generateActorId(LEADER_ID));
962 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
963 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
964 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
966 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
970 public void testChangeServersVotingStatus() {
971 LOG.info("testChangeServersVotingStatus starting");
973 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
974 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
975 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
977 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
978 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
979 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
980 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
982 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
983 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
984 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
985 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
986 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
988 ActorRef follower1Collector = actorFactory.createActor(
989 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
990 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
991 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
992 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
993 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
995 ActorRef follower2Collector = actorFactory.createActor(
996 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
997 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
998 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
999 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1000 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1002 // Send first ChangeServersVotingStatus message
1004 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
1006 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1007 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1009 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1010 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1011 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1012 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1014 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1015 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1016 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1017 nonVotingServer(FOLLOWER_ID2));
1019 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1020 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1021 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1022 nonVotingServer(FOLLOWER_ID2));
1024 MessageCollectorActor.clearMessages(leaderCollector);
1025 MessageCollectorActor.clearMessages(follower1Collector);
1026 MessageCollectorActor.clearMessages(follower2Collector);
1028 // Send second ChangeServersVotingStatus message
1030 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1031 reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1032 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1034 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1035 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1036 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1038 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1039 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1040 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1042 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1043 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1044 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1046 LOG.info("testChangeServersVotingStatus ending");
1050 public void testChangeLeaderToNonVoting() {
1051 LOG.info("testChangeLeaderToNonVoting starting");
1053 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1054 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1056 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1057 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1058 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1059 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1061 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1062 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1063 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1064 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1065 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1067 ActorRef follower1Collector = actorFactory.createActor(
1068 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1069 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1070 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1071 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1072 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1074 ActorRef follower2Collector = actorFactory.createActor(
1075 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1076 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1077 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1078 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1079 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1081 // Send ChangeServersVotingStatus message
1083 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1084 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1085 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1087 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1088 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1089 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1091 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1092 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1093 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1095 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1096 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1097 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1099 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1100 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1102 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1104 LOG.info("testChangeLeaderToNonVoting ending");
1108 public void testChangeLeaderToNonVotingInSingleNode() {
1109 LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1111 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1112 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1113 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1115 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1116 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1117 assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1119 LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1123 public void testChangeToVotingWithNoLeader() {
1124 LOG.info("testChangeToVotingWithNoLeader starting");
1126 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1127 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1128 configParams.setElectionTimeoutFactor(5);
1130 final String node1ID = "node1";
1131 final String node2ID = "node2";
1133 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1134 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1137 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1138 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1139 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1140 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1142 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1143 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1144 InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1145 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1146 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1147 InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1149 ActorRef node1Collector = actorFactory.createActor(
1150 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1151 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1152 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1153 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1154 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1156 ActorRef node2Collector = actorFactory.createActor(
1157 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1158 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1159 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1160 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1161 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1163 node1RaftActor.waitForInitializeBehaviorComplete();
1164 node2RaftActor.waitForInitializeBehaviorComplete();
1166 // Verify the intended server config was loaded and applied.
1167 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1168 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1169 votingServer("downNode2"));
1170 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1171 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1172 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1174 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1175 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1176 votingServer("downNode2"));
1177 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1179 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1180 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1181 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1182 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1184 // First send the message such that node1 has no peer address for node2 - should fail.
1186 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1187 node2ID, true, "downNode1", false, "downNode2", false));
1188 node1RaftActorRef.tell(changeServers, testKit.getRef());
1189 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1190 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1191 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1193 // Send an AppendEntries so node1 has a leaderId
1195 long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1196 node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1197 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1199 // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
1200 // ChangeServersVotingStatus message, it will try to elect a leader.
1202 AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
1203 rs -> assertEquals("getLeader", null, rs.getLeader()));
1205 // Update node2's peer address and send the message again
1207 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1209 node1RaftActorRef.tell(changeServers, testKit.getRef());
1210 reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1211 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1213 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1214 ApplyJournalEntries.class);
1215 assertEquals("getToIndex", 1, apply.getToIndex());
1216 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1217 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1218 nonVotingServer("downNode2"));
1219 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1220 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1222 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1223 assertEquals("getToIndex", 1, apply.getToIndex());
1224 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1225 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1226 nonVotingServer("downNode2"));
1227 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1228 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1230 LOG.info("testChangeToVotingWithNoLeader ending");
1234 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1235 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1237 final String node1ID = "node1";
1238 final String node2ID = "node2";
1240 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1241 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1242 ? actorFactory.createTestActorPath(node2ID) : null;
1244 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1245 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1246 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1248 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1249 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1250 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1251 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1253 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1254 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1255 configParams1.setElectionTimeoutFactor(1);
1256 configParams1.setPeerAddressResolver(peerAddressResolver);
1257 ActorRef node1Collector = actorFactory.createActor(
1258 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1259 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1260 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1261 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1262 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1264 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1265 configParams2.setElectionTimeoutFactor(1000000);
1266 configParams2.setPeerAddressResolver(peerAddressResolver);
1267 ActorRef node2Collector = actorFactory.createActor(
1268 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1269 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1270 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1271 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1272 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1274 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1275 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1276 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1277 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1278 // node2 was previously voting.
1280 node2RaftActor.setDropMessageOfType(RequestVote.class);
1282 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1283 node1RaftActorRef.tell(changeServers, testKit.getRef());
1284 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1285 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1287 assertEquals("Server config", ImmutableSet.of(nonVotingServer(node1ID), votingServer(node2ID)),
1288 new HashSet<>(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1289 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1291 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1295 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1296 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1298 final String node1ID = "node1";
1299 final String node2ID = "node2";
1301 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1302 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1303 ? actorFactory.createTestActorPath(node2ID) : null;
1305 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1306 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1307 configParams.setElectionTimeoutFactor(3);
1308 configParams.setPeerAddressResolver(peerAddressResolver);
1310 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1311 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1312 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1314 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1315 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1316 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1317 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1318 InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1319 new MockRaftActorContext.MockPayload("2")));
1320 InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1322 ActorRef node1Collector = actorFactory.createActor(
1323 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1324 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1325 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1326 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1327 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1329 ActorRef node2Collector = actorFactory.createActor(
1330 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1331 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1332 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1333 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1334 final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1336 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1337 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1338 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1339 // forward the request to node2.
1341 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1342 ImmutableMap.of(node1ID, true, node2ID, true));
1343 node1RaftActorRef.tell(changeServers, testKit.getRef());
1344 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1345 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1347 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1348 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1349 votingServer(node1ID), votingServer(node2ID));
1350 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1352 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1353 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1354 votingServer(node1ID), votingServer(node2ID));
1355 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1356 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1358 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1362 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1363 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1365 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1366 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1367 configParams.setElectionTimeoutFactor(100000);
1369 final String node1ID = "node1";
1370 final String node2ID = "node2";
1372 configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1373 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1374 ? actorFactory.createTestActorPath(node2ID) : null);
1376 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1377 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1378 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1380 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1381 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1382 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1383 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1385 ActorRef node1Collector = actorFactory.createActor(
1386 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1387 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1388 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1389 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1390 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1392 ActorRef node2Collector = actorFactory.createActor(
1393 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1394 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1395 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1396 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1397 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1399 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1400 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1401 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1402 // request to node2 when node2 is elected.
1404 node2RaftActor.setDropMessageOfType(RequestVote.class);
1406 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1408 node1RaftActorRef.tell(changeServers, testKit.getRef());
1410 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1412 node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1414 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1415 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1417 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1418 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1419 votingServer(node1ID), votingServer(node2ID));
1420 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1421 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1423 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1424 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1425 votingServer(node1ID), votingServer(node2ID));
1426 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1428 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1431 private static void verifyRaftState(final RaftState expState, final RaftActor... raftActors) {
1432 Stopwatch sw = Stopwatch.createStarted();
1433 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1434 for (RaftActor raftActor : raftActors) {
1435 if (raftActor.getRaftState() == expState) {
1441 fail("None of the RaftActors have state " + expState);
1444 private static ServerInfo votingServer(final String id) {
1445 return new ServerInfo(id, true);
1448 private static ServerInfo nonVotingServer(final String id) {
1449 return new ServerInfo(id, false);
1452 private ActorRef newLeaderCollectorActor(final MockLeaderRaftActor leaderRaftActor) {
1453 return newCollectorActor(leaderRaftActor, LEADER_ID);
1456 private ActorRef newCollectorActor(final AbstractMockRaftActor raftActor, final String id) {
1457 ActorRef collectorActor = actorFactory.createTestActor(
1458 MessageCollectorActor.props(), actorFactory.generateActorId(id + "Collector"));
1459 raftActor.setCollectorActor(collectorActor);
1460 return collectorActor;
1463 private static void verifyServerConfigurationPayloadEntry(final ReplicatedLog log, final ServerInfo... expected) {
1464 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1465 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1466 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1467 assertEquals("Server config", ImmutableSet.copyOf(expected), new HashSet<>(payload.getServerConfig()));
1470 private static RaftActorContextImpl newFollowerContext(final String id,
1471 final TestActorRef<? extends AbstractActor> actor) {
1472 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1473 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1474 configParams.setElectionTimeoutFactor(100000);
1475 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
1476 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1477 termInfo.update(1, LEADER_ID);
1478 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1479 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1480 noPersistence, applyState -> actor.tell(applyState, actor), LOG, MoreExecutors.directExecutor());
1483 abstract static class AbstractMockRaftActor extends MockRaftActor {
1484 private volatile ActorRef collectorActor;
1485 private volatile Class<?> dropMessageOfType;
1487 AbstractMockRaftActor(final String id, final Map<String, String> peerAddresses,
1488 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1489 super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1490 .persistent(Optional.of(persistent)));
1491 this.collectorActor = collectorActor;
1494 void setDropMessageOfType(final Class<?> dropMessageOfType) {
1495 this.dropMessageOfType = dropMessageOfType;
1498 void setCollectorActor(final ActorRef collectorActor) {
1499 this.collectorActor = collectorActor;
1503 public void handleCommand(final Object message) {
1504 if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1505 super.handleCommand(message);
1508 if (collectorActor != null) {
1509 collectorActor.tell(message, getSender());
1514 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1516 CollectingMockRaftActor(final String id, final Map<String, String> peerAddresses,
1517 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1518 super(id, peerAddresses, config, persistent, collectorActor);
1519 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1521 public void createSnapshot(final ActorRef actorRef,
1522 final Optional<OutputStream> installSnapshotStream) {
1523 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1527 public void applySnapshot(
1528 final org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1532 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1533 final ByteSource snapshotBytes) {
1534 throw new UnsupportedOperationException();
1539 public static Props props(final String id, final Map<String, String> peerAddresses,
1540 final ConfigParams config, final boolean persistent, final ActorRef collectorActor) {
1542 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1543 persistent, collectorActor);
1548 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1549 public MockLeaderRaftActor(final Map<String, String> peerAddresses, final ConfigParams config,
1550 final RaftActorContext fromContext) {
1551 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1552 setPersistence(false);
1554 RaftActorContext context = getRaftActorContext();
1555 for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1556 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1557 getState().add(entry.getData());
1558 context.getReplicatedLog().append(entry);
1561 context.setCommitIndex(fromContext.getCommitIndex());
1562 context.setLastApplied(fromContext.getLastApplied());
1563 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1564 fromContext.getTermInformation().getVotedFor());
1568 protected void initializeBehavior() {
1569 changeCurrentBehavior(new Leader(getRaftActorContext()));
1570 initializeBehaviorComplete.countDown();
1574 @SuppressWarnings("checkstyle:IllegalCatch")
1575 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1576 MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
1577 if (installSnapshotStream.isPresent()) {
1578 SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
1581 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1584 static Props props(final Map<String, String> peerAddresses, final RaftActorContext fromContext) {
1585 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1586 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1587 configParams.setElectionTimeoutFactor(10);
1588 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1592 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1593 public MockNewFollowerRaftActor(final ConfigParams config, final ActorRef collectorActor) {
1594 super(NEW_SERVER_ID, new HashMap<>(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1595 setPersistence(false);
1598 static Props props(final ConfigParams config, final ActorRef collectorActor) {
1599 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);