2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
18 import akka.actor.AbstractActor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.Maps;
28 import com.google.common.collect.Sets;
29 import com.google.common.io.ByteSource;
30 import java.io.OutputStream;
31 import java.time.Duration;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collections;
35 import java.util.List;
37 import java.util.concurrent.TimeUnit;
38 import org.apache.commons.lang3.SerializationUtils;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
45 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
46 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
47 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
48 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
49 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
50 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
51 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
52 import org.opendaylight.controller.cluster.raft.messages.AddServer;
53 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
56 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
58 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
60 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
61 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
62 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
63 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
64 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
65 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
68 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
70 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
71 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
72 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
73 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
74 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77 import scala.concurrent.duration.FiniteDuration;
80 * Unit tests for RaftActorServerConfigurationSupport.
82 * @author Thomas Pantelis
84 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
85 static final String LEADER_ID = "leader";
86 static final String FOLLOWER_ID = "follower";
87 static final String FOLLOWER_ID2 = "follower2";
88 static final String NEW_SERVER_ID = "new-server";
89 static final String NEW_SERVER_ID2 = "new-server2";
90 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
91 private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
92 private static final boolean NO_PERSISTENCE = false;
93 private static final boolean PERSISTENT = true;
95 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
97 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
98 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
99 actorFactory.generateActorId(FOLLOWER_ID));
101 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
102 private ActorRef newFollowerCollectorActor;
103 private RaftActorContext newFollowerActorContext;
105 private final TestKit testKit = new TestKit(getSystem());
108 public void setup() {
109 InMemoryJournal.clear();
110 InMemorySnapshotStore.clear();
113 @SuppressWarnings("checkstyle:IllegalCatch")
114 private void setupNewFollower() {
115 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
117 newFollowerCollectorActor = actorFactory.createActor(MessageCollectorActor.props(),
118 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
119 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
120 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
121 actorFactory.generateActorId(NEW_SERVER_ID));
124 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
125 } catch (Exception e) {
126 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
130 private static DefaultConfigParamsImpl newFollowerConfigParams() {
131 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
132 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
133 configParams.setElectionTimeoutFactor(100000);
134 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
139 public void tearDown() {
140 actorFactory.close();
144 public void testAddServerWithExistingFollower() {
145 LOG.info("testAddServerWithExistingFollower starting");
147 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
148 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
150 followerActorContext.setCommitIndex(2);
151 followerActorContext.setLastApplied(2);
153 Follower follower = new Follower(followerActorContext);
154 followerActor.underlyingActor().setBehavior(follower);
155 followerActorContext.setCurrentBehavior(follower);
157 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
158 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
159 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
160 actorFactory.generateActorId(LEADER_ID));
162 // Expect initial heartbeat from the leader.
163 expectFirstMatching(followerActor, AppendEntries.class);
164 clearMessages(followerActor);
166 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
167 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
169 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
171 // Leader should install snapshot - capture and verify ApplySnapshot contents
173 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
174 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
175 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
177 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
178 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
179 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
181 // Verify ServerConfigurationPayload entry in leader's log
183 expectFirstMatching(leaderCollectorActor, ApplyState.class);
184 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
185 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
186 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
187 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
188 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
189 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
191 // Verify ServerConfigurationPayload entry in both followers
193 expectFirstMatching(followerActor, ApplyState.class);
194 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
195 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
196 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
198 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
199 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
200 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
201 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
203 // Verify new server config was applied in both followers
205 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
207 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
208 newFollowerActorContext.getPeerIds());
210 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
211 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
212 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
213 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
215 assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
216 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
217 assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
218 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
220 assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
221 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
222 assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
223 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
225 LOG.info("testAddServerWithExistingFollower ending");
229 public void testAddServerWithNoExistingFollower() {
230 LOG.info("testAddServerWithNoExistingFollower starting");
233 RaftActorContext initialActorContext = new MockRaftActorContext();
234 initialActorContext.setCommitIndex(1);
235 initialActorContext.setLastApplied(1);
236 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
239 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
240 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
241 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
242 actorFactory.generateActorId(LEADER_ID));
244 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
245 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
247 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
249 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
251 // Leader should install snapshot - capture and verify ApplySnapshot contents
253 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
254 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
255 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
257 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
258 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
259 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
261 // Verify ServerConfigurationPayload entry in leader's log
263 expectFirstMatching(leaderCollectorActor, ApplyState.class);
264 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
265 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
266 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
267 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
268 votingServer(NEW_SERVER_ID));
270 // Verify ServerConfigurationPayload entry in the new follower
272 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
273 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
274 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
275 votingServer(NEW_SERVER_ID));
277 // Verify new server config was applied in the new follower
279 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
281 LOG.info("testAddServerWithNoExistingFollower ending");
285 public void testAddServersAsNonVoting() {
286 LOG.info("testAddServersAsNonVoting starting");
289 RaftActorContext initialActorContext = new MockRaftActorContext();
291 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
292 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
293 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
294 actorFactory.generateActorId(LEADER_ID));
296 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
297 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
299 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
301 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
303 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
304 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
305 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
307 // Verify ServerConfigurationPayload entry in leader's log
309 expectFirstMatching(leaderCollectorActor, ApplyState.class);
311 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
312 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
313 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
314 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
315 nonVotingServer(NEW_SERVER_ID));
317 // Verify ServerConfigurationPayload entry in the new follower
319 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
320 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
321 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
322 nonVotingServer(NEW_SERVER_ID));
324 // Verify new server config was applied in the new follower
326 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
328 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
330 // Add another non-voting server.
332 clearMessages(leaderCollectorActor);
334 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
335 Follower newFollower2 = new Follower(follower2ActorContext);
336 followerActor.underlyingActor().setBehavior(newFollower2);
338 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
340 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
341 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
342 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
344 expectFirstMatching(leaderCollectorActor, ApplyState.class);
345 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
346 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
347 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
348 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
349 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
351 LOG.info("testAddServersAsNonVoting ending");
355 public void testAddServerWithOperationInProgress() {
356 LOG.info("testAddServerWithOperationInProgress starting");
359 RaftActorContext initialActorContext = new MockRaftActorContext();
361 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
362 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
363 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
364 actorFactory.generateActorId(LEADER_ID));
366 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
367 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
369 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
371 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
372 Follower newFollower2 = new Follower(follower2ActorContext);
373 followerActor.underlyingActor().setBehavior(newFollower2);
375 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
376 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
378 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
380 // Wait for leader's install snapshot and capture it
382 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
384 // Send a second AddServer - should get queued
385 TestKit testKit2 = new TestKit(getSystem());
386 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
388 // Continue the first AddServer
389 newFollowerRaftActorInstance.setDropMessageOfType(null);
390 newFollowerRaftActor.tell(installSnapshot, leaderActor);
392 // Verify both complete successfully
393 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
394 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
396 addServerReply = testKit2.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
397 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
399 // Verify ServerConfigurationPayload entries in leader's log
401 expectMatching(leaderCollectorActor, ApplyState.class, 2);
402 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
403 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
404 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
405 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
406 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
408 // Verify ServerConfigurationPayload entry in the new follower
410 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
411 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
412 newFollowerActorContext.getPeerIds());
414 LOG.info("testAddServerWithOperationInProgress ending");
418 public void testAddServerWithPriorSnapshotInProgress() {
419 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
422 RaftActorContext initialActorContext = new MockRaftActorContext();
424 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
425 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
426 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
427 actorFactory.generateActorId(LEADER_ID));
429 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
430 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
432 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
434 // Drop commit message for now to delay snapshot completion
435 leaderRaftActor.setDropMessageOfType(String.class);
437 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
439 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
441 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
443 leaderRaftActor.setDropMessageOfType(null);
444 leaderActor.tell(commitMsg, leaderActor);
446 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
447 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
448 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
450 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
452 // Verify ServerConfigurationPayload entry in leader's log
454 expectFirstMatching(leaderCollectorActor, ApplyState.class);
455 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
456 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
457 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
458 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
459 votingServer(NEW_SERVER_ID));
461 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
465 public void testAddServerWithPriorSnapshotCompleteTimeout() {
466 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
469 RaftActorContext initialActorContext = new MockRaftActorContext();
471 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
472 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
473 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
474 actorFactory.generateActorId(LEADER_ID));
476 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
477 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
479 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
481 // Drop commit message so the snapshot doesn't complete.
482 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
484 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
486 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
488 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
489 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
491 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
493 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
497 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() {
498 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
501 RaftActorContext initialActorContext = new MockRaftActorContext();
503 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
504 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
505 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
506 actorFactory.generateActorId(LEADER_ID));
508 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
509 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
510 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
512 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
514 // Drop the commit message so the snapshot doesn't complete yet.
515 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
517 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
519 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
521 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
523 // Change the leader behavior to follower
524 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
526 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
527 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
528 // isCapturing assertion below.
529 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
531 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
532 leaderActor.tell(commitMsg, leaderActor);
534 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
536 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
537 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
539 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
540 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
542 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
546 public void testAddServerWithLeaderChangeDuringInstallSnapshot() {
547 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
550 RaftActorContext initialActorContext = new MockRaftActorContext();
552 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
553 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
554 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
555 actorFactory.generateActorId(LEADER_ID));
557 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
558 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
560 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
562 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
564 // Drop the UnInitializedFollowerSnapshotReply to delay it.
565 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
567 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
569 final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
570 UnInitializedFollowerSnapshotReply.class);
572 // Prevent election timeout when the leader switches to follower
573 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
575 // Change the leader behavior to follower
576 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
578 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
579 leaderRaftActor.setDropMessageOfType(null);
580 leaderActor.tell(snapshotReply, leaderActor);
582 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
583 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
585 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
587 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
591 public void testAddServerWithInstallSnapshotTimeout() {
592 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
595 RaftActorContext initialActorContext = new MockRaftActorContext();
597 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
598 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
599 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
600 actorFactory.generateActorId(LEADER_ID));
602 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
603 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
604 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
606 // Drop the InstallSnapshot message so it times out
607 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
609 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
611 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
613 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
614 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
616 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
617 assertEquals("Leader followers size", 0,
618 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
620 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
624 public void testAddServerWithNoLeader() {
625 LOG.info("testAddServerWithNoLeader starting");
628 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
629 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
631 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
632 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
633 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
634 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
635 actorFactory.generateActorId(LEADER_ID));
636 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
638 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
640 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
641 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
643 LOG.info("testAddServerWithNoLeader ending");
647 public void testAddServerWithNoConsensusReached() {
648 LOG.info("testAddServerWithNoConsensusReached starting");
651 RaftActorContext initialActorContext = new MockRaftActorContext();
653 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
654 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
655 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
656 actorFactory.generateActorId(LEADER_ID));
658 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
659 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
661 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
663 // Drop UnInitializedFollowerSnapshotReply initially
664 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
666 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
667 newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
669 // Drop AppendEntries to the new follower so consensus isn't reached
670 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
672 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
674 // Capture the UnInitializedFollowerSnapshotReply
675 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
677 // Send the UnInitializedFollowerSnapshotReply to resume the first request
678 leaderRaftActor.setDropMessageOfType(null);
679 leaderActor.tell(snapshotReply, leaderActor);
681 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
683 // Send a second AddServer
684 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
686 // The first AddServer should succeed with OK even though consensus wasn't reached
687 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
688 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
689 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
691 // Verify ServerConfigurationPayload entry in leader's log
692 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
693 votingServer(NEW_SERVER_ID));
695 // The second AddServer should fail since consensus wasn't reached for the first
696 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
697 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
699 // Re-send the second AddServer - should also fail
700 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
701 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
702 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
704 LOG.info("testAddServerWithNoConsensusReached ending");
708 public void testAddServerWithExistingServer() {
709 LOG.info("testAddServerWithExistingServer starting");
711 RaftActorContext initialActorContext = new MockRaftActorContext();
713 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
714 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
715 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
716 actorFactory.generateActorId(LEADER_ID));
718 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
720 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
721 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
723 LOG.info("testAddServerWithExistingServer ending");
727 public void testAddServerForwardedToLeader() {
728 LOG.info("testAddServerForwardedToLeader starting");
731 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
732 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
734 ActorRef leaderActor = actorFactory.createActor(
735 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
737 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
738 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
739 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
740 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
741 actorFactory.generateActorId(FOLLOWER_ID));
742 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
744 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
745 -1, -1, (short)0), leaderActor);
747 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
749 expectFirstMatching(leaderActor, AddServer.class);
751 LOG.info("testAddServerForwardedToLeader ending");
755 public void testOnApplyState() {
756 LOG.info("testOnApplyState starting");
758 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
759 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
760 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
761 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
762 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
763 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
764 actorFactory.generateActorId(LEADER_ID));
766 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
767 noLeaderActor.underlyingActor());
769 ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
770 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
771 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
772 assertEquals("Message handled", true, handled);
774 ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
775 new MockRaftActorContext.MockPayload("1"));
776 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
777 assertEquals("Message handled", false, handled);
779 LOG.info("testOnApplyState ending");
783 public void testRemoveServerWithNoLeader() {
784 LOG.info("testRemoveServerWithNoLeader starting");
786 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
787 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
789 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
790 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
791 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
792 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
793 actorFactory.generateActorId(LEADER_ID));
794 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
796 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
797 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
798 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
800 LOG.info("testRemoveServerWithNoLeader ending");
804 public void testRemoveServerNonExistentServer() {
805 LOG.info("testRemoveServerNonExistentServer starting");
807 RaftActorContext initialActorContext = new MockRaftActorContext();
809 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
810 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
811 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
812 actorFactory.generateActorId(LEADER_ID));
814 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
815 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
816 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
818 LOG.info("testRemoveServerNonExistentServer ending");
822 public void testRemoveServerForwardToLeader() {
823 LOG.info("testRemoveServerForwardToLeader starting");
825 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
826 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
828 ActorRef leaderActor = actorFactory.createTestActor(
829 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
831 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
832 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
833 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
834 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
835 actorFactory.generateActorId(FOLLOWER_ID));
836 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
838 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
839 -1, -1, (short)0), leaderActor);
841 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
842 expectFirstMatching(leaderActor, RemoveServer.class);
844 LOG.info("testRemoveServerForwardToLeader ending");
848 public void testRemoveServer() {
849 LOG.info("testRemoveServer starting");
851 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
852 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
853 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
855 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
856 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
857 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
858 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
859 RaftActorContext initialActorContext = new MockRaftActorContext();
861 final String downNodeId = "downNode";
862 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
863 ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
864 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
865 actorFactory.generateActorId(LEADER_ID));
867 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
869 ActorRef follower1Collector = actorFactory.createActor(
870 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
871 final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
872 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
873 FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
874 follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
876 ActorRef follower2Collector = actorFactory.createActor(
877 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
878 final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
879 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
880 FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
881 follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
883 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
884 follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
885 follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
887 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
888 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
889 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
891 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
892 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
893 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
894 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
896 applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
897 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
898 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
899 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
901 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
902 assertTrue("Expected Leader", currentBehavior instanceof Leader);
903 assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
905 MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
907 LOG.info("testRemoveServer ending");
911 public void testRemoveServerLeader() {
912 LOG.info("testRemoveServerLeader starting");
914 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
915 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
916 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
918 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
919 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
920 RaftActorContext initialActorContext = new MockRaftActorContext();
922 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
923 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
924 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
925 actorFactory.generateActorId(LEADER_ID));
927 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
929 final ActorRef followerCollector =
930 actorFactory.createActor(MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
931 actorFactory.createTestActor(
932 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
933 configParams, NO_PERSISTENCE, followerCollector)
934 .withDispatcher(Dispatchers.DefaultDispatcherId()),
937 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
938 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
939 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
941 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
942 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
943 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
944 votingServer(FOLLOWER_ID));
946 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
948 LOG.info("testRemoveServerLeader ending");
952 public void testRemoveServerLeaderWithNoFollowers() {
953 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
955 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
956 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
957 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
958 actorFactory.generateActorId(LEADER_ID));
960 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
961 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
962 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
964 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
968 public void testChangeServersVotingStatus() {
969 LOG.info("testChangeServersVotingStatus starting");
971 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
972 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
973 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
975 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
976 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
977 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
978 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
980 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
981 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
982 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
983 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
984 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
986 ActorRef follower1Collector = actorFactory.createActor(
987 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
988 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
989 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
990 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
991 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
993 ActorRef follower2Collector = actorFactory.createActor(
994 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
995 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
996 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
997 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
998 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1000 // Send first ChangeServersVotingStatus message
1002 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
1004 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1005 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1007 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1008 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1009 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1010 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1012 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1013 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1014 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1015 nonVotingServer(FOLLOWER_ID2));
1017 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1018 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1019 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1020 nonVotingServer(FOLLOWER_ID2));
1022 MessageCollectorActor.clearMessages(leaderCollector);
1023 MessageCollectorActor.clearMessages(follower1Collector);
1024 MessageCollectorActor.clearMessages(follower2Collector);
1026 // Send second ChangeServersVotingStatus message
1028 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1029 reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1030 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1032 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1033 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1034 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1036 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1037 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1038 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1040 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1041 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1042 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1044 LOG.info("testChangeServersVotingStatus ending");
1048 public void testChangeLeaderToNonVoting() {
1049 LOG.info("testChangeLeaderToNonVoting starting");
1051 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1052 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1054 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1055 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1056 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1057 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1059 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1060 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1061 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1062 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1063 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1065 ActorRef follower1Collector = actorFactory.createActor(
1066 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1067 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1068 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1069 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1070 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1072 ActorRef follower2Collector = actorFactory.createActor(
1073 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1074 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1075 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1076 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1077 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1079 // Send ChangeServersVotingStatus message
1081 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1082 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1083 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1085 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1086 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1087 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1089 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1090 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1091 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1093 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1094 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1095 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1097 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1098 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1100 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1102 LOG.info("testChangeLeaderToNonVoting ending");
1106 public void testChangeLeaderToNonVotingInSingleNode() {
1107 LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1109 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1110 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1111 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1113 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1114 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1115 assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1117 LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1121 public void testChangeToVotingWithNoLeader() {
1122 LOG.info("testChangeToVotingWithNoLeader starting");
1124 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1125 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1126 configParams.setElectionTimeoutFactor(5);
1128 final String node1ID = "node1";
1129 final String node2ID = "node2";
1131 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1132 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1135 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1136 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1137 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1138 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1140 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1141 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1142 InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1143 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1144 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1145 InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1147 ActorRef node1Collector = actorFactory.createActor(
1148 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1149 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1150 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1151 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1152 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1154 ActorRef node2Collector = actorFactory.createActor(
1155 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1156 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1157 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1158 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1159 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1161 node1RaftActor.waitForInitializeBehaviorComplete();
1162 node2RaftActor.waitForInitializeBehaviorComplete();
1164 // Verify the intended server config was loaded and applied.
1165 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1166 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1167 votingServer("downNode2"));
1168 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1169 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1170 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1172 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1173 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1174 votingServer("downNode2"));
1175 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1177 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1178 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1179 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1180 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1182 // First send the message such that node1 has no peer address for node2 - should fail.
1184 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1185 node2ID, true, "downNode1", false, "downNode2", false));
1186 node1RaftActorRef.tell(changeServers, testKit.getRef());
1187 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1188 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1189 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1191 // Send an AppendEntries so node1 has a leaderId
1193 long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1194 node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1195 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1197 // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
1198 // ChangeServersVotingStatus message, it will try to elect a leader.
1200 AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
1201 rs -> assertEquals("getLeader", null, rs.getLeader()));
1203 // Update node2's peer address and send the message again
1205 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1207 node1RaftActorRef.tell(changeServers, testKit.getRef());
1208 reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1209 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1211 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1212 ApplyJournalEntries.class);
1213 assertEquals("getToIndex", 1, apply.getToIndex());
1214 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1215 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1216 nonVotingServer("downNode2"));
1217 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1218 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1220 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1221 assertEquals("getToIndex", 1, apply.getToIndex());
1222 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1223 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1224 nonVotingServer("downNode2"));
1225 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1226 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1228 LOG.info("testChangeToVotingWithNoLeader ending");
1232 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1233 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1235 final String node1ID = "node1";
1236 final String node2ID = "node2";
1238 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1239 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1240 ? actorFactory.createTestActorPath(node2ID) : null;
1242 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1243 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1244 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1246 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1247 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1248 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1249 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1251 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1252 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1253 configParams1.setElectionTimeoutFactor(1);
1254 configParams1.setPeerAddressResolver(peerAddressResolver);
1255 ActorRef node1Collector = actorFactory.createActor(
1256 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1257 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1258 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1259 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1260 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1262 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1263 configParams2.setElectionTimeoutFactor(1000000);
1264 configParams2.setPeerAddressResolver(peerAddressResolver);
1265 ActorRef node2Collector = actorFactory.createActor(
1266 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1267 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1268 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1269 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1270 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1272 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1273 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1274 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1275 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1276 // node2 was previously voting.
1278 node2RaftActor.setDropMessageOfType(RequestVote.class);
1280 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1281 node1RaftActorRef.tell(changeServers, testKit.getRef());
1282 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1283 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1285 assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1286 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1287 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1289 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1293 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1294 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1296 final String node1ID = "node1";
1297 final String node2ID = "node2";
1299 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1300 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1301 ? actorFactory.createTestActorPath(node2ID) : null;
1303 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1304 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1305 configParams.setElectionTimeoutFactor(3);
1306 configParams.setPeerAddressResolver(peerAddressResolver);
1308 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1309 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1310 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1312 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1313 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1314 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1315 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1316 InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1317 new MockRaftActorContext.MockPayload("2")));
1318 InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1320 ActorRef node1Collector = actorFactory.createActor(
1321 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1322 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1323 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1324 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1325 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1327 ActorRef node2Collector = actorFactory.createActor(
1328 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1329 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1330 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1331 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1332 final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1334 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1335 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1336 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1337 // forward the request to node2.
1339 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1340 ImmutableMap.of(node1ID, true, node2ID, true));
1341 node1RaftActorRef.tell(changeServers, testKit.getRef());
1342 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1343 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1345 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1346 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1347 votingServer(node1ID), votingServer(node2ID));
1348 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1350 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1351 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1352 votingServer(node1ID), votingServer(node2ID));
1353 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1354 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1356 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1360 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1361 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1363 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1364 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1365 configParams.setElectionTimeoutFactor(100000);
1367 final String node1ID = "node1";
1368 final String node2ID = "node2";
1370 configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1371 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1372 ? actorFactory.createTestActorPath(node2ID) : null);
1374 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1375 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1376 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1378 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1379 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1380 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1381 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1383 ActorRef node1Collector = actorFactory.createActor(
1384 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1385 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1386 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1387 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1388 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1390 ActorRef node2Collector = actorFactory.createActor(
1391 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1392 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1393 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1394 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1395 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1397 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1398 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1399 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1400 // request to node2 when node2 is elected.
1402 node2RaftActor.setDropMessageOfType(RequestVote.class);
1404 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1406 node1RaftActorRef.tell(changeServers, testKit.getRef());
1408 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1410 node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1412 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1413 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1415 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1416 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1417 votingServer(node1ID), votingServer(node2ID));
1418 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1419 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1421 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1422 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1423 votingServer(node1ID), votingServer(node2ID));
1424 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1426 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1429 private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1430 Stopwatch sw = Stopwatch.createStarted();
1431 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1432 for (RaftActor raftActor : raftActors) {
1433 if (raftActor.getRaftState() == expState) {
1439 fail("None of the RaftActors have state " + expState);
1442 private static ServerInfo votingServer(String id) {
1443 return new ServerInfo(id, true);
1446 private static ServerInfo nonVotingServer(String id) {
1447 return new ServerInfo(id, false);
1450 private ActorRef newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1451 return newCollectorActor(leaderRaftActor, LEADER_ID);
1454 private ActorRef newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1455 ActorRef collectorActor = actorFactory.createTestActor(
1456 MessageCollectorActor.props(), actorFactory.generateActorId(id + "Collector"));
1457 raftActor.setCollectorActor(collectorActor);
1458 return collectorActor;
1461 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1462 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1463 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1464 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1465 assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1468 private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends AbstractActor> actor) {
1469 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1470 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1471 configParams.setElectionTimeoutFactor(100000);
1472 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
1473 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1474 termInfo.update(1, LEADER_ID);
1475 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1476 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1477 noPersistence, applyState -> actor.tell(applyState, actor), LOG);
1480 abstract static class AbstractMockRaftActor extends MockRaftActor {
1481 private volatile ActorRef collectorActor;
1482 private volatile Class<?> dropMessageOfType;
1484 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1485 boolean persistent, ActorRef collectorActor) {
1486 super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1487 .persistent(Optional.of(persistent)));
1488 this.collectorActor = collectorActor;
1491 void setDropMessageOfType(Class<?> dropMessageOfType) {
1492 this.dropMessageOfType = dropMessageOfType;
1495 void setCollectorActor(ActorRef collectorActor) {
1496 this.collectorActor = collectorActor;
1500 public void handleCommand(Object message) {
1501 if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1502 super.handleCommand(message);
1505 if (collectorActor != null) {
1506 collectorActor.tell(message, getSender());
1511 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1513 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1514 boolean persistent, ActorRef collectorActor) {
1515 super(id, peerAddresses, config, persistent, collectorActor);
1516 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1518 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1519 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1523 public void applySnapshot(
1524 org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1528 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1529 ByteSource snapshotBytes) {
1530 throw new UnsupportedOperationException();
1535 public static Props props(final String id, final Map<String, String> peerAddresses,
1536 ConfigParams config, boolean persistent, ActorRef collectorActor) {
1538 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1539 persistent, collectorActor);
1544 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1545 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1546 RaftActorContext fromContext) {
1547 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1548 setPersistence(false);
1550 RaftActorContext context = getRaftActorContext();
1551 for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1552 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1553 getState().add(entry.getData());
1554 context.getReplicatedLog().append(entry);
1557 context.setCommitIndex(fromContext.getCommitIndex());
1558 context.setLastApplied(fromContext.getLastApplied());
1559 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1560 fromContext.getTermInformation().getVotedFor());
1564 protected void initializeBehavior() {
1565 changeCurrentBehavior(new Leader(getRaftActorContext()));
1566 initializeBehaviorComplete.countDown();
1570 @SuppressWarnings("checkstyle:IllegalCatch")
1571 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1572 MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
1573 if (installSnapshotStream.isPresent()) {
1574 SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
1577 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1580 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1581 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1582 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1583 configParams.setElectionTimeoutFactor(10);
1584 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1588 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1589 public MockNewFollowerRaftActor(ConfigParams config, ActorRef collectorActor) {
1590 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
1592 setPersistence(false);
1595 static Props props(ConfigParams config, ActorRef collectorActor) {
1596 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);