2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
18 import akka.actor.AbstractActor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.MoreExecutors;
30 import java.io.OutputStream;
31 import java.time.Duration;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collections;
35 import java.util.List;
37 import java.util.Optional;
38 import java.util.concurrent.TimeUnit;
39 import org.apache.commons.lang3.SerializationUtils;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
44 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
46 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
47 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
48 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
49 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
50 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
51 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
52 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
53 import org.opendaylight.controller.cluster.raft.messages.AddServer;
54 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
59 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
61 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
62 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
63 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
64 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
66 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
72 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
73 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
74 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78 import scala.concurrent.duration.FiniteDuration;
81 * Unit tests for RaftActorServerConfigurationSupport.
83 * @author Thomas Pantelis
85 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
86 static final String LEADER_ID = "leader";
87 static final String FOLLOWER_ID = "follower";
88 static final String FOLLOWER_ID2 = "follower2";
89 static final String NEW_SERVER_ID = "new-server";
90 static final String NEW_SERVER_ID2 = "new-server2";
91 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
92 private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
93 private static final boolean NO_PERSISTENCE = false;
94 private static final boolean PERSISTENT = true;
96 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
98 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
99 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
100 actorFactory.generateActorId(FOLLOWER_ID));
102 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
103 private ActorRef newFollowerCollectorActor;
104 private RaftActorContext newFollowerActorContext;
106 private final TestKit testKit = new TestKit(getSystem());
109 public void setup() {
110 InMemoryJournal.clear();
111 InMemorySnapshotStore.clear();
114 @SuppressWarnings("checkstyle:IllegalCatch")
115 private void setupNewFollower() {
116 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
118 newFollowerCollectorActor = actorFactory.createActor(MessageCollectorActor.props(),
119 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
120 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
121 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
122 actorFactory.generateActorId(NEW_SERVER_ID));
125 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
126 } catch (Exception e) {
127 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
131 private static DefaultConfigParamsImpl newFollowerConfigParams() {
132 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
133 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
134 configParams.setElectionTimeoutFactor(100000);
135 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
140 public void tearDown() {
141 actorFactory.close();
145 public void testAddServerWithExistingFollower() {
146 LOG.info("testAddServerWithExistingFollower starting");
148 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
149 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
151 followerActorContext.setCommitIndex(2);
152 followerActorContext.setLastApplied(2);
154 Follower follower = new Follower(followerActorContext);
155 followerActor.underlyingActor().setBehavior(follower);
156 followerActorContext.setCurrentBehavior(follower);
158 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
159 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
160 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
161 actorFactory.generateActorId(LEADER_ID));
163 // Expect initial heartbeat from the leader.
164 expectFirstMatching(followerActor, AppendEntries.class);
165 clearMessages(followerActor);
167 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
168 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
170 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
172 // Leader should install snapshot - capture and verify ApplySnapshot contents
174 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
175 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
176 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
178 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
179 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
180 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
182 // Verify ServerConfigurationPayload entry in leader's log
184 expectFirstMatching(leaderCollectorActor, ApplyState.class);
185 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
186 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
187 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
188 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
189 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
190 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
192 // Verify ServerConfigurationPayload entry in both followers
194 expectFirstMatching(followerActor, ApplyState.class);
195 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
196 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
197 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
199 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
200 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
201 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
202 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
204 // Verify new server config was applied in both followers
206 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
208 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
209 newFollowerActorContext.getPeerIds());
211 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
212 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
213 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
214 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
216 assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
217 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
218 assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
219 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
221 assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
222 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
223 assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
224 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
226 LOG.info("testAddServerWithExistingFollower ending");
230 public void testAddServerWithNoExistingFollower() {
231 LOG.info("testAddServerWithNoExistingFollower starting");
234 RaftActorContext initialActorContext = new MockRaftActorContext();
235 initialActorContext.setCommitIndex(1);
236 initialActorContext.setLastApplied(1);
237 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
240 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
241 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
242 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
243 actorFactory.generateActorId(LEADER_ID));
245 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
246 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
248 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
250 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
252 // Leader should install snapshot - capture and verify ApplySnapshot contents
254 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
255 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
256 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
258 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
259 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
260 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
262 // Verify ServerConfigurationPayload entry in leader's log
264 expectFirstMatching(leaderCollectorActor, ApplyState.class);
265 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
266 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
267 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
268 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
269 votingServer(NEW_SERVER_ID));
271 // Verify ServerConfigurationPayload entry in the new follower
273 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
274 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
275 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
276 votingServer(NEW_SERVER_ID));
278 // Verify new server config was applied in the new follower
280 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
282 LOG.info("testAddServerWithNoExistingFollower ending");
286 public void testAddServersAsNonVoting() {
287 LOG.info("testAddServersAsNonVoting starting");
290 RaftActorContext initialActorContext = new MockRaftActorContext();
292 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
293 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
294 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
295 actorFactory.generateActorId(LEADER_ID));
297 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
298 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
300 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
302 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
304 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
305 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
306 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
308 // Verify ServerConfigurationPayload entry in leader's log
310 expectFirstMatching(leaderCollectorActor, ApplyState.class);
312 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
313 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
314 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
315 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
316 nonVotingServer(NEW_SERVER_ID));
318 // Verify ServerConfigurationPayload entry in the new follower
320 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
321 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
322 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
323 nonVotingServer(NEW_SERVER_ID));
325 // Verify new server config was applied in the new follower
327 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
329 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
331 // Add another non-voting server.
333 clearMessages(leaderCollectorActor);
335 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
336 Follower newFollower2 = new Follower(follower2ActorContext);
337 followerActor.underlyingActor().setBehavior(newFollower2);
339 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
341 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
342 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
343 assertEquals("getLeaderHint", Optional.of(LEADER_ID), addServerReply.getLeaderHint());
345 expectFirstMatching(leaderCollectorActor, ApplyState.class);
346 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
347 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
348 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
349 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
350 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
352 LOG.info("testAddServersAsNonVoting ending");
356 public void testAddServerWithOperationInProgress() {
357 LOG.info("testAddServerWithOperationInProgress starting");
360 RaftActorContext initialActorContext = new MockRaftActorContext();
362 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
363 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
364 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
365 actorFactory.generateActorId(LEADER_ID));
367 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
368 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
370 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
372 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
373 Follower newFollower2 = new Follower(follower2ActorContext);
374 followerActor.underlyingActor().setBehavior(newFollower2);
376 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
377 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
379 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
381 // Wait for leader's install snapshot and capture it
383 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
385 // Send a second AddServer - should get queued
386 TestKit testKit2 = new TestKit(getSystem());
387 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
389 // Continue the first AddServer
390 newFollowerRaftActorInstance.setDropMessageOfType(null);
391 newFollowerRaftActor.tell(installSnapshot, leaderActor);
393 // Verify both complete successfully
394 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
395 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
397 addServerReply = testKit2.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
398 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
400 // Verify ServerConfigurationPayload entries in leader's log
402 expectMatching(leaderCollectorActor, ApplyState.class, 2);
403 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
404 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
405 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
406 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
407 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
409 // Verify ServerConfigurationPayload entry in the new follower
411 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
412 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
413 newFollowerActorContext.getPeerIds());
415 LOG.info("testAddServerWithOperationInProgress ending");
419 public void testAddServerWithPriorSnapshotInProgress() {
420 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
423 RaftActorContext initialActorContext = new MockRaftActorContext();
425 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
426 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
427 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
428 actorFactory.generateActorId(LEADER_ID));
430 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
431 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
433 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
435 // Drop commit message for now to delay snapshot completion
436 leaderRaftActor.setDropMessageOfType(String.class);
438 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
440 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
442 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
444 leaderRaftActor.setDropMessageOfType(null);
445 leaderActor.tell(commitMsg, leaderActor);
447 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
448 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
449 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
451 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
453 // Verify ServerConfigurationPayload entry in leader's log
455 expectFirstMatching(leaderCollectorActor, ApplyState.class);
456 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
457 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
458 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
459 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
460 votingServer(NEW_SERVER_ID));
462 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
466 public void testAddServerWithPriorSnapshotCompleteTimeout() {
467 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
470 RaftActorContext initialActorContext = new MockRaftActorContext();
472 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
473 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
474 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
475 actorFactory.generateActorId(LEADER_ID));
477 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
478 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
480 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
482 // Drop commit message so the snapshot doesn't complete.
483 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
485 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
487 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
489 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
490 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
492 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
494 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
498 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() {
499 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
502 RaftActorContext initialActorContext = new MockRaftActorContext();
504 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
505 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
506 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
507 actorFactory.generateActorId(LEADER_ID));
509 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
510 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
511 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
513 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
515 // Drop the commit message so the snapshot doesn't complete yet.
516 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
518 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
520 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
522 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
524 // Change the leader behavior to follower
525 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
527 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
528 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
529 // isCapturing assertion below.
530 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
532 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
533 leaderActor.tell(commitMsg, leaderActor);
535 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
537 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
538 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
540 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
541 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
543 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
547 public void testAddServerWithLeaderChangeDuringInstallSnapshot() {
548 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
551 RaftActorContext initialActorContext = new MockRaftActorContext();
553 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
554 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
555 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
556 actorFactory.generateActorId(LEADER_ID));
558 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
559 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
561 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
563 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
565 // Drop the UnInitializedFollowerSnapshotReply to delay it.
566 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
568 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
570 final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
571 UnInitializedFollowerSnapshotReply.class);
573 // Prevent election timeout when the leader switches to follower
574 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
576 // Change the leader behavior to follower
577 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
579 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
580 leaderRaftActor.setDropMessageOfType(null);
581 leaderActor.tell(snapshotReply, leaderActor);
583 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
584 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
586 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
588 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
592 public void testAddServerWithInstallSnapshotTimeout() {
593 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
596 RaftActorContext initialActorContext = new MockRaftActorContext();
598 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
599 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
600 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
601 actorFactory.generateActorId(LEADER_ID));
603 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
604 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
605 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
607 // Drop the InstallSnapshot message so it times out
608 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
610 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
612 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
614 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
615 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
617 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
618 assertEquals("Leader followers size", 0,
619 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
621 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
625 public void testAddServerWithNoLeader() {
626 LOG.info("testAddServerWithNoLeader starting");
629 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
630 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
632 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
633 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
634 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
635 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
636 actorFactory.generateActorId(LEADER_ID));
637 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
639 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
641 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
642 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
644 LOG.info("testAddServerWithNoLeader ending");
648 public void testAddServerWithNoConsensusReached() {
649 LOG.info("testAddServerWithNoConsensusReached starting");
652 RaftActorContext initialActorContext = new MockRaftActorContext();
654 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
655 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
656 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
657 actorFactory.generateActorId(LEADER_ID));
659 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
660 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
662 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
664 // Drop UnInitializedFollowerSnapshotReply initially
665 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
667 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
668 newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
670 // Drop AppendEntries to the new follower so consensus isn't reached
671 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
673 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
675 // Capture the UnInitializedFollowerSnapshotReply
676 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
678 // Send the UnInitializedFollowerSnapshotReply to resume the first request
679 leaderRaftActor.setDropMessageOfType(null);
680 leaderActor.tell(snapshotReply, leaderActor);
682 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
684 // Send a second AddServer
685 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
687 // The first AddServer should succeed with OK even though consensus wasn't reached
688 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
689 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
690 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
692 // Verify ServerConfigurationPayload entry in leader's log
693 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
694 votingServer(NEW_SERVER_ID));
696 // The second AddServer should fail since consensus wasn't reached for the first
697 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
698 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
700 // Re-send the second AddServer - should also fail
701 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
702 addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
703 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
705 LOG.info("testAddServerWithNoConsensusReached ending");
709 public void testAddServerWithExistingServer() {
710 LOG.info("testAddServerWithExistingServer starting");
712 RaftActorContext initialActorContext = new MockRaftActorContext();
714 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
715 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
716 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
717 actorFactory.generateActorId(LEADER_ID));
719 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
721 AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
722 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
724 LOG.info("testAddServerWithExistingServer ending");
728 public void testAddServerForwardedToLeader() {
729 LOG.info("testAddServerForwardedToLeader starting");
732 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
733 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
735 ActorRef leaderActor = actorFactory.createActor(
736 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
738 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
739 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
740 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
741 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
742 actorFactory.generateActorId(FOLLOWER_ID));
743 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
745 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
746 -1, -1, (short)0), leaderActor);
748 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
750 expectFirstMatching(leaderActor, AddServer.class);
752 LOG.info("testAddServerForwardedToLeader ending");
756 public void testOnApplyState() {
757 LOG.info("testOnApplyState starting");
759 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
760 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
761 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
762 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
763 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
764 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
765 actorFactory.generateActorId(LEADER_ID));
767 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
768 noLeaderActor.underlyingActor());
770 ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
771 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
772 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
773 assertEquals("Message handled", true, handled);
775 ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
776 new MockRaftActorContext.MockPayload("1"));
777 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
778 assertEquals("Message handled", false, handled);
780 LOG.info("testOnApplyState ending");
784 public void testRemoveServerWithNoLeader() {
785 LOG.info("testRemoveServerWithNoLeader starting");
787 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
788 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
790 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
791 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
792 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
793 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
794 actorFactory.generateActorId(LEADER_ID));
795 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
797 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
798 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
799 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
801 LOG.info("testRemoveServerWithNoLeader ending");
805 public void testRemoveServerNonExistentServer() {
806 LOG.info("testRemoveServerNonExistentServer starting");
808 RaftActorContext initialActorContext = new MockRaftActorContext();
810 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
811 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
812 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
813 actorFactory.generateActorId(LEADER_ID));
815 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
816 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
817 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
819 LOG.info("testRemoveServerNonExistentServer ending");
823 public void testRemoveServerForwardToLeader() {
824 LOG.info("testRemoveServerForwardToLeader starting");
826 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
827 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
829 ActorRef leaderActor = actorFactory.createTestActor(
830 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
832 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
833 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
834 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
835 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
836 actorFactory.generateActorId(FOLLOWER_ID));
837 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
839 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
840 -1, -1, (short)0), leaderActor);
842 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
843 expectFirstMatching(leaderActor, RemoveServer.class);
845 LOG.info("testRemoveServerForwardToLeader ending");
849 public void testRemoveServer() {
850 LOG.info("testRemoveServer starting");
852 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
853 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
854 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
856 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
857 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
858 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
859 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
860 RaftActorContext initialActorContext = new MockRaftActorContext();
862 final String downNodeId = "downNode";
863 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
864 ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
865 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
866 actorFactory.generateActorId(LEADER_ID));
868 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
870 ActorRef follower1Collector = actorFactory.createActor(
871 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
872 final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
873 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
874 FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
875 follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
877 ActorRef follower2Collector = actorFactory.createActor(
878 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
879 final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
880 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
881 FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
882 follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
884 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
885 follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
886 follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
888 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
889 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
890 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
892 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
893 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
894 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
895 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
897 applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
898 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
899 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
900 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
902 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
903 assertTrue("Expected Leader", currentBehavior instanceof Leader);
904 assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
906 MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
908 LOG.info("testRemoveServer ending");
912 public void testRemoveServerLeader() {
913 LOG.info("testRemoveServerLeader starting");
915 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
916 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
917 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
919 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
920 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
921 RaftActorContext initialActorContext = new MockRaftActorContext();
923 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
924 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
925 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
926 actorFactory.generateActorId(LEADER_ID));
928 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
930 final ActorRef followerCollector =
931 actorFactory.createActor(MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
932 actorFactory.createTestActor(
933 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
934 configParams, NO_PERSISTENCE, followerCollector)
935 .withDispatcher(Dispatchers.DefaultDispatcherId()),
938 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
939 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
940 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
942 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
943 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
944 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
945 votingServer(FOLLOWER_ID));
947 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
949 LOG.info("testRemoveServerLeader ending");
953 public void testRemoveServerLeaderWithNoFollowers() {
954 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
956 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
957 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
958 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
959 actorFactory.generateActorId(LEADER_ID));
961 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
962 RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
963 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
965 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
969 public void testChangeServersVotingStatus() {
970 LOG.info("testChangeServersVotingStatus starting");
972 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
973 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
974 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
976 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
977 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
978 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
979 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
981 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
982 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
983 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
984 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
985 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
987 ActorRef follower1Collector = actorFactory.createActor(
988 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
989 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
990 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
991 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
992 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
994 ActorRef follower2Collector = actorFactory.createActor(
995 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
996 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
997 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
998 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
999 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1001 // Send first ChangeServersVotingStatus message
1003 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
1005 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1006 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1008 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1009 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1010 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1011 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1013 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1014 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1015 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1016 nonVotingServer(FOLLOWER_ID2));
1018 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1019 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1020 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1021 nonVotingServer(FOLLOWER_ID2));
1023 MessageCollectorActor.clearMessages(leaderCollector);
1024 MessageCollectorActor.clearMessages(follower1Collector);
1025 MessageCollectorActor.clearMessages(follower2Collector);
1027 // Send second ChangeServersVotingStatus message
1029 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1030 reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1031 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1033 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1034 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1035 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1037 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1038 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1039 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1041 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1042 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1043 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1045 LOG.info("testChangeServersVotingStatus ending");
1049 public void testChangeLeaderToNonVoting() {
1050 LOG.info("testChangeLeaderToNonVoting starting");
1052 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1053 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1055 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1056 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1057 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1058 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1060 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1061 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1062 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1063 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1064 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1066 ActorRef follower1Collector = actorFactory.createActor(
1067 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1068 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1069 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1070 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1071 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1073 ActorRef follower2Collector = actorFactory.createActor(
1074 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1075 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1076 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1077 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1078 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1080 // Send ChangeServersVotingStatus message
1082 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1083 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1084 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1086 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1087 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1088 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1090 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1091 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1092 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1094 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1095 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1096 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1098 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1099 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1101 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1103 LOG.info("testChangeLeaderToNonVoting ending");
1107 public void testChangeLeaderToNonVotingInSingleNode() {
1108 LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1110 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1111 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1112 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1114 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1115 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1116 assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1118 LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1122 public void testChangeToVotingWithNoLeader() {
1123 LOG.info("testChangeToVotingWithNoLeader starting");
1125 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1126 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1127 configParams.setElectionTimeoutFactor(5);
1129 final String node1ID = "node1";
1130 final String node2ID = "node2";
1132 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1133 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1136 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1137 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1138 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1139 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1141 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1142 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1143 InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1144 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1145 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1146 InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1148 ActorRef node1Collector = actorFactory.createActor(
1149 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1150 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1151 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1152 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1153 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1155 ActorRef node2Collector = actorFactory.createActor(
1156 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1157 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1158 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1159 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1160 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1162 node1RaftActor.waitForInitializeBehaviorComplete();
1163 node2RaftActor.waitForInitializeBehaviorComplete();
1165 // Verify the intended server config was loaded and applied.
1166 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1167 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1168 votingServer("downNode2"));
1169 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1170 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1171 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1173 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1174 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1175 votingServer("downNode2"));
1176 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1178 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1179 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1180 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1181 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1183 // First send the message such that node1 has no peer address for node2 - should fail.
1185 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1186 node2ID, true, "downNode1", false, "downNode2", false));
1187 node1RaftActorRef.tell(changeServers, testKit.getRef());
1188 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1189 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1190 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1192 // Send an AppendEntries so node1 has a leaderId
1194 long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1195 node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1196 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1198 // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
1199 // ChangeServersVotingStatus message, it will try to elect a leader.
1201 AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
1202 rs -> assertEquals("getLeader", null, rs.getLeader()));
1204 // Update node2's peer address and send the message again
1206 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1208 node1RaftActorRef.tell(changeServers, testKit.getRef());
1209 reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1210 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1212 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1213 ApplyJournalEntries.class);
1214 assertEquals("getToIndex", 1, apply.getToIndex());
1215 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1216 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1217 nonVotingServer("downNode2"));
1218 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1219 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1221 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1222 assertEquals("getToIndex", 1, apply.getToIndex());
1223 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1224 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1225 nonVotingServer("downNode2"));
1226 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1227 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1229 LOG.info("testChangeToVotingWithNoLeader ending");
1233 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1234 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1236 final String node1ID = "node1";
1237 final String node2ID = "node2";
1239 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1240 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1241 ? actorFactory.createTestActorPath(node2ID) : null;
1243 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1244 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1245 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1247 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1248 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1249 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1250 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1252 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1253 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1254 configParams1.setElectionTimeoutFactor(1);
1255 configParams1.setPeerAddressResolver(peerAddressResolver);
1256 ActorRef node1Collector = actorFactory.createActor(
1257 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1258 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1259 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1260 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1261 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1263 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1264 configParams2.setElectionTimeoutFactor(1000000);
1265 configParams2.setPeerAddressResolver(peerAddressResolver);
1266 ActorRef node2Collector = actorFactory.createActor(
1267 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1268 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1269 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1270 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1271 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1273 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1274 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1275 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1276 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1277 // node2 was previously voting.
1279 node2RaftActor.setDropMessageOfType(RequestVote.class);
1281 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1282 node1RaftActorRef.tell(changeServers, testKit.getRef());
1283 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1284 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1286 assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1287 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1288 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1290 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1294 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1295 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1297 final String node1ID = "node1";
1298 final String node2ID = "node2";
1300 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1301 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1302 ? actorFactory.createTestActorPath(node2ID) : null;
1304 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1305 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1306 configParams.setElectionTimeoutFactor(3);
1307 configParams.setPeerAddressResolver(peerAddressResolver);
1309 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1310 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1311 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1313 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1314 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1315 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1316 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1317 InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1318 new MockRaftActorContext.MockPayload("2")));
1319 InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1321 ActorRef node1Collector = actorFactory.createActor(
1322 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1323 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1324 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1325 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1326 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1328 ActorRef node2Collector = actorFactory.createActor(
1329 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1330 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1331 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1332 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1333 final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1335 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1336 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1337 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1338 // forward the request to node2.
1340 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1341 ImmutableMap.of(node1ID, true, node2ID, true));
1342 node1RaftActorRef.tell(changeServers, testKit.getRef());
1343 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1344 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1346 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1347 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1348 votingServer(node1ID), votingServer(node2ID));
1349 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1351 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1352 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1353 votingServer(node1ID), votingServer(node2ID));
1354 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1355 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1357 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1361 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1362 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1364 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1365 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1366 configParams.setElectionTimeoutFactor(100000);
1368 final String node1ID = "node1";
1369 final String node2ID = "node2";
1371 configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1372 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1373 ? actorFactory.createTestActorPath(node2ID) : null);
1375 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1376 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1377 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1379 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1380 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1381 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1382 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1384 ActorRef node1Collector = actorFactory.createActor(
1385 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1386 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1387 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1388 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1389 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1391 ActorRef node2Collector = actorFactory.createActor(
1392 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1393 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1394 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1395 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1396 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1398 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1399 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1400 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1401 // request to node2 when node2 is elected.
1403 node2RaftActor.setDropMessageOfType(RequestVote.class);
1405 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1407 node1RaftActorRef.tell(changeServers, testKit.getRef());
1409 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1411 node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1413 ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1414 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1416 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1417 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1418 votingServer(node1ID), votingServer(node2ID));
1419 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1420 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1422 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1423 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1424 votingServer(node1ID), votingServer(node2ID));
1425 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1427 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1430 private static void verifyRaftState(final RaftState expState, final RaftActor... raftActors) {
1431 Stopwatch sw = Stopwatch.createStarted();
1432 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1433 for (RaftActor raftActor : raftActors) {
1434 if (raftActor.getRaftState() == expState) {
1440 fail("None of the RaftActors have state " + expState);
1443 private static ServerInfo votingServer(final String id) {
1444 return new ServerInfo(id, true);
1447 private static ServerInfo nonVotingServer(final String id) {
1448 return new ServerInfo(id, false);
1451 private ActorRef newLeaderCollectorActor(final MockLeaderRaftActor leaderRaftActor) {
1452 return newCollectorActor(leaderRaftActor, LEADER_ID);
1455 private ActorRef newCollectorActor(final AbstractMockRaftActor raftActor, final String id) {
1456 ActorRef collectorActor = actorFactory.createTestActor(
1457 MessageCollectorActor.props(), actorFactory.generateActorId(id + "Collector"));
1458 raftActor.setCollectorActor(collectorActor);
1459 return collectorActor;
1462 private static void verifyServerConfigurationPayloadEntry(final ReplicatedLog log, final ServerInfo... expected) {
1463 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1464 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1465 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1466 assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1469 private static RaftActorContextImpl newFollowerContext(final String id,
1470 final TestActorRef<? extends AbstractActor> actor) {
1471 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1472 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1473 configParams.setElectionTimeoutFactor(100000);
1474 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
1475 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1476 termInfo.update(1, LEADER_ID);
1477 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1478 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1479 noPersistence, applyState -> actor.tell(applyState, actor), LOG, MoreExecutors.directExecutor());
1482 abstract static class AbstractMockRaftActor extends MockRaftActor {
1483 private volatile ActorRef collectorActor;
1484 private volatile Class<?> dropMessageOfType;
1486 AbstractMockRaftActor(final String id, final Map<String, String> peerAddresses,
1487 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1488 super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1489 .persistent(Optional.of(persistent)));
1490 this.collectorActor = collectorActor;
1493 void setDropMessageOfType(final Class<?> dropMessageOfType) {
1494 this.dropMessageOfType = dropMessageOfType;
1497 void setCollectorActor(final ActorRef collectorActor) {
1498 this.collectorActor = collectorActor;
1502 public void handleCommand(final Object message) {
1503 if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1504 super.handleCommand(message);
1507 if (collectorActor != null) {
1508 collectorActor.tell(message, getSender());
1513 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1515 CollectingMockRaftActor(final String id, final Map<String, String> peerAddresses,
1516 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1517 super(id, peerAddresses, config, persistent, collectorActor);
1518 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1520 public void createSnapshot(final ActorRef actorRef,
1521 final Optional<OutputStream> installSnapshotStream) {
1522 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1526 public void applySnapshot(
1527 final org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1531 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1532 final ByteSource snapshotBytes) {
1533 throw new UnsupportedOperationException();
1538 public static Props props(final String id, final Map<String, String> peerAddresses,
1539 final ConfigParams config, final boolean persistent, final ActorRef collectorActor) {
1541 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1542 persistent, collectorActor);
1547 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1548 public MockLeaderRaftActor(final Map<String, String> peerAddresses, final ConfigParams config,
1549 final RaftActorContext fromContext) {
1550 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1551 setPersistence(false);
1553 RaftActorContext context = getRaftActorContext();
1554 for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1555 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1556 getState().add(entry.getData());
1557 context.getReplicatedLog().append(entry);
1560 context.setCommitIndex(fromContext.getCommitIndex());
1561 context.setLastApplied(fromContext.getLastApplied());
1562 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1563 fromContext.getTermInformation().getVotedFor());
1567 protected void initializeBehavior() {
1568 changeCurrentBehavior(new Leader(getRaftActorContext()));
1569 initializeBehaviorComplete.countDown();
1573 @SuppressWarnings("checkstyle:IllegalCatch")
1574 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1575 MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
1576 if (installSnapshotStream.isPresent()) {
1577 SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
1580 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1583 static Props props(final Map<String, String> peerAddresses, final RaftActorContext fromContext) {
1584 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1585 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1586 configParams.setElectionTimeoutFactor(10);
1587 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1591 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1592 public MockNewFollowerRaftActor(final ConfigParams config, final ActorRef collectorActor) {
1593 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
1595 setPersistence(false);
1598 static Props props(final ConfigParams config, final ActorRef collectorActor) {
1599 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);