2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
18 import akka.actor.AbstractActor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.Maps;
28 import com.google.common.collect.Sets;
29 import com.google.common.io.ByteSource;
30 import java.io.OutputStream;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.SerializationUtils;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
46 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
48 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
49 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
50 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
51 import org.opendaylight.controller.cluster.raft.messages.AddServer;
52 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
53 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
54 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
55 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
56 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
57 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
58 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
59 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
60 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
61 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
62 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
63 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
65 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
67 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
69 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
70 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76 import scala.concurrent.duration.FiniteDuration;
79 * Unit tests for RaftActorServerConfigurationSupport.
81 * @author Thomas Pantelis
83 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
84 static final String LEADER_ID = "leader";
85 static final String FOLLOWER_ID = "follower";
86 static final String FOLLOWER_ID2 = "follower2";
87 static final String NEW_SERVER_ID = "new-server";
88 static final String NEW_SERVER_ID2 = "new-server2";
89 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
90 private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
91 private static final boolean NO_PERSISTENCE = false;
92 private static final boolean PERSISTENT = true;
94 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
96 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
97 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
98 actorFactory.generateActorId(FOLLOWER_ID));
100 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
101 private ActorRef newFollowerCollectorActor;
102 private RaftActorContext newFollowerActorContext;
104 private final TestKit testKit = new TestKit(getSystem());
107 public void setup() {
108 InMemoryJournal.clear();
109 InMemorySnapshotStore.clear();
112 @SuppressWarnings("checkstyle:IllegalCatch")
113 private void setupNewFollower() {
114 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
116 newFollowerCollectorActor = actorFactory.createActor(MessageCollectorActor.props(),
117 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
118 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
119 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
120 actorFactory.generateActorId(NEW_SERVER_ID));
123 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
124 } catch (Exception e) {
125 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
129 private static DefaultConfigParamsImpl newFollowerConfigParams() {
130 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
131 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
132 configParams.setElectionTimeoutFactor(100000);
133 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
138 public void tearDown() {
139 actorFactory.close();
143 public void testAddServerWithExistingFollower() {
144 LOG.info("testAddServerWithExistingFollower starting");
146 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
147 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
149 followerActorContext.setCommitIndex(2);
150 followerActorContext.setLastApplied(2);
152 Follower follower = new Follower(followerActorContext);
153 followerActor.underlyingActor().setBehavior(follower);
154 followerActorContext.setCurrentBehavior(follower);
156 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
157 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
158 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
159 actorFactory.generateActorId(LEADER_ID));
161 // Expect initial heartbeat from the leader.
162 expectFirstMatching(followerActor, AppendEntries.class);
163 clearMessages(followerActor);
165 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
166 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
168 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
170 // Leader should install snapshot - capture and verify ApplySnapshot contents
172 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
173 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
174 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
176 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
177 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
178 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
180 // Verify ServerConfigurationPayload entry in leader's log
182 expectFirstMatching(leaderCollectorActor, ApplyState.class);
183 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
184 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
185 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
186 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
187 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
188 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
190 // Verify ServerConfigurationPayload entry in both followers
192 expectFirstMatching(followerActor, ApplyState.class);
193 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
194 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
195 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
197 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
198 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
199 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
200 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
202 // Verify new server config was applied in both followers
204 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
206 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
207 newFollowerActorContext.getPeerIds());
209 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
210 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
211 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
212 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
214 assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
215 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
216 assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
217 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
219 assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
220 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
221 assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
222 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
224 LOG.info("testAddServerWithExistingFollower ending");
228 public void testAddServerWithNoExistingFollower() {
229 LOG.info("testAddServerWithNoExistingFollower starting");
232 RaftActorContext initialActorContext = new MockRaftActorContext();
233 initialActorContext.setCommitIndex(1);
234 initialActorContext.setLastApplied(1);
235 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
238 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
239 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
240 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
241 actorFactory.generateActorId(LEADER_ID));
243 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
244 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
246 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
248 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
250 // Leader should install snapshot - capture and verify ApplySnapshot contents
252 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
253 List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
254 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
256 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
257 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
258 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
260 // Verify ServerConfigurationPayload entry in leader's log
262 expectFirstMatching(leaderCollectorActor, ApplyState.class);
263 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
264 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
265 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
266 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
267 votingServer(NEW_SERVER_ID));
269 // Verify ServerConfigurationPayload entry in the new follower
271 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
272 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
273 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
274 votingServer(NEW_SERVER_ID));
276 // Verify new server config was applied in the new follower
278 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
280 LOG.info("testAddServerWithNoExistingFollower ending");
284 public void testAddServersAsNonVoting() {
285 LOG.info("testAddServersAsNonVoting starting");
288 RaftActorContext initialActorContext = new MockRaftActorContext();
290 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
291 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
292 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
293 actorFactory.generateActorId(LEADER_ID));
295 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
296 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
298 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
300 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
302 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
303 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
304 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
306 // Verify ServerConfigurationPayload entry in leader's log
308 expectFirstMatching(leaderCollectorActor, ApplyState.class);
310 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
311 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
312 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
313 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
314 nonVotingServer(NEW_SERVER_ID));
316 // Verify ServerConfigurationPayload entry in the new follower
318 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
319 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
320 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
321 nonVotingServer(NEW_SERVER_ID));
323 // Verify new server config was applied in the new follower
325 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
327 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
329 // Add another non-voting server.
331 clearMessages(leaderCollectorActor);
333 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
334 Follower newFollower2 = new Follower(follower2ActorContext);
335 followerActor.underlyingActor().setBehavior(newFollower2);
337 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
339 addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
340 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
341 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
343 expectFirstMatching(leaderCollectorActor, ApplyState.class);
344 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
345 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
346 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
347 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
348 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
350 LOG.info("testAddServersAsNonVoting ending");
354 public void testAddServerWithOperationInProgress() {
355 LOG.info("testAddServerWithOperationInProgress starting");
358 RaftActorContext initialActorContext = new MockRaftActorContext();
360 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
361 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
362 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
363 actorFactory.generateActorId(LEADER_ID));
365 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
366 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
368 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
370 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
371 Follower newFollower2 = new Follower(follower2ActorContext);
372 followerActor.underlyingActor().setBehavior(newFollower2);
374 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
375 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
377 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
379 // Wait for leader's install snapshot and capture it
381 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
383 // Send a second AddServer - should get queued
384 TestKit testKit2 = new TestKit(getSystem());
385 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
387 // Continue the first AddServer
388 newFollowerRaftActorInstance.setDropMessageOfType(null);
389 newFollowerRaftActor.tell(installSnapshot, leaderActor);
391 // Verify both complete successfully
392 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
393 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
395 addServerReply = testKit2.expectMsgClass(testKit2.duration("5 seconds"), AddServerReply.class);
396 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
398 // Verify ServerConfigurationPayload entries in leader's log
400 expectMatching(leaderCollectorActor, ApplyState.class, 2);
401 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
402 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
403 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
404 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
405 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
407 // Verify ServerConfigurationPayload entry in the new follower
409 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
410 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
411 newFollowerActorContext.getPeerIds());
413 LOG.info("testAddServerWithOperationInProgress ending");
417 public void testAddServerWithPriorSnapshotInProgress() {
418 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
421 RaftActorContext initialActorContext = new MockRaftActorContext();
423 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
424 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
425 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
426 actorFactory.generateActorId(LEADER_ID));
428 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
429 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
431 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
433 // Drop commit message for now to delay snapshot completion
434 leaderRaftActor.setDropMessageOfType(String.class);
436 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
438 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
440 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
442 leaderRaftActor.setDropMessageOfType(null);
443 leaderActor.tell(commitMsg, leaderActor);
445 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
446 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
447 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
449 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
451 // Verify ServerConfigurationPayload entry in leader's log
453 expectFirstMatching(leaderCollectorActor, ApplyState.class);
454 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
455 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
456 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
457 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
458 votingServer(NEW_SERVER_ID));
460 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
464 public void testAddServerWithPriorSnapshotCompleteTimeout() {
465 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
468 RaftActorContext initialActorContext = new MockRaftActorContext();
470 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
471 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
472 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
473 actorFactory.generateActorId(LEADER_ID));
475 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
476 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
478 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
480 // Drop commit message so the snapshot doesn't complete.
481 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
483 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
485 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
487 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
488 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
490 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
492 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
496 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() {
497 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
500 RaftActorContext initialActorContext = new MockRaftActorContext();
502 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
503 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
504 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
505 actorFactory.generateActorId(LEADER_ID));
507 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
508 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
509 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
511 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
513 // Drop the commit message so the snapshot doesn't complete yet.
514 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
516 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
518 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
520 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
522 // Change the leader behavior to follower
523 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
525 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
526 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
527 // isCapturing assertion below.
528 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
530 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
531 leaderActor.tell(commitMsg, leaderActor);
533 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
535 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
536 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
538 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
539 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
541 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
545 public void testAddServerWithLeaderChangeDuringInstallSnapshot() {
546 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
549 RaftActorContext initialActorContext = new MockRaftActorContext();
551 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
552 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
553 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
554 actorFactory.generateActorId(LEADER_ID));
556 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
557 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
559 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
561 ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
563 // Drop the UnInitializedFollowerSnapshotReply to delay it.
564 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
566 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
568 final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
569 UnInitializedFollowerSnapshotReply.class);
571 // Prevent election timeout when the leader switches to follower
572 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
574 // Change the leader behavior to follower
575 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
577 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
578 leaderRaftActor.setDropMessageOfType(null);
579 leaderActor.tell(snapshotReply, leaderActor);
581 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
582 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
584 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
586 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
590 public void testAddServerWithInstallSnapshotTimeout() {
591 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
594 RaftActorContext initialActorContext = new MockRaftActorContext();
596 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
597 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
598 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
599 actorFactory.generateActorId(LEADER_ID));
601 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
602 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
603 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
605 // Drop the InstallSnapshot message so it times out
606 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
608 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
610 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
612 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
613 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
615 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
616 assertEquals("Leader followers size", 0,
617 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
619 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
623 public void testAddServerWithNoLeader() {
624 LOG.info("testAddServerWithNoLeader starting");
627 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
628 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
630 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
631 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
632 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
633 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
634 actorFactory.generateActorId(LEADER_ID));
635 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
637 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
639 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
640 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
642 LOG.info("testAddServerWithNoLeader ending");
646 public void testAddServerWithNoConsensusReached() {
647 LOG.info("testAddServerWithNoConsensusReached starting");
650 RaftActorContext initialActorContext = new MockRaftActorContext();
652 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
653 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
654 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
655 actorFactory.generateActorId(LEADER_ID));
657 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
658 final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
660 final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
662 // Drop UnInitializedFollowerSnapshotReply initially
663 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
665 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
666 newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
668 // Drop AppendEntries to the new follower so consensus isn't reached
669 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
671 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
673 // Capture the UnInitializedFollowerSnapshotReply
674 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
676 // Send the UnInitializedFollowerSnapshotReply to resume the first request
677 leaderRaftActor.setDropMessageOfType(null);
678 leaderActor.tell(snapshotReply, leaderActor);
680 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
682 // Send a second AddServer
683 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
685 // The first AddServer should succeed with OK even though consensus wasn't reached
686 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
687 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
688 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
690 // Verify ServerConfigurationPayload entry in leader's log
691 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
692 votingServer(NEW_SERVER_ID));
694 // The second AddServer should fail since consensus wasn't reached for the first
695 addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
696 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
698 // Re-send the second AddServer - should also fail
699 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
700 addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
701 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
703 LOG.info("testAddServerWithNoConsensusReached ending");
707 public void testAddServerWithExistingServer() {
708 LOG.info("testAddServerWithExistingServer starting");
710 RaftActorContext initialActorContext = new MockRaftActorContext();
712 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
713 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
714 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
715 actorFactory.generateActorId(LEADER_ID));
717 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
719 AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
720 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
722 LOG.info("testAddServerWithExistingServer ending");
726 public void testAddServerForwardedToLeader() {
727 LOG.info("testAddServerForwardedToLeader starting");
730 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
731 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
733 ActorRef leaderActor = actorFactory.createActor(
734 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
736 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
737 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
738 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
739 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
740 actorFactory.generateActorId(FOLLOWER_ID));
741 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
743 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
744 -1, -1, (short)0), leaderActor);
746 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
748 expectFirstMatching(leaderActor, AddServer.class);
750 LOG.info("testAddServerForwardedToLeader ending");
754 public void testOnApplyState() {
755 LOG.info("testOnApplyState starting");
757 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
758 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
759 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
760 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
761 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
762 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
763 actorFactory.generateActorId(LEADER_ID));
765 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
766 noLeaderActor.underlyingActor());
768 ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
769 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
770 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
771 assertEquals("Message handled", true, handled);
773 ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
774 new MockRaftActorContext.MockPayload("1"));
775 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
776 assertEquals("Message handled", false, handled);
778 LOG.info("testOnApplyState ending");
782 public void testRemoveServerWithNoLeader() {
783 LOG.info("testRemoveServerWithNoLeader starting");
785 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
786 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
788 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
789 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
790 followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
791 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
792 actorFactory.generateActorId(LEADER_ID));
793 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
795 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
796 RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
797 RemoveServerReply.class);
798 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
800 LOG.info("testRemoveServerWithNoLeader ending");
804 public void testRemoveServerNonExistentServer() {
805 LOG.info("testRemoveServerNonExistentServer starting");
807 RaftActorContext initialActorContext = new MockRaftActorContext();
809 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
810 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
811 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
812 actorFactory.generateActorId(LEADER_ID));
814 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
815 RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
816 RemoveServerReply.class);
817 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
819 LOG.info("testRemoveServerNonExistentServer ending");
823 public void testRemoveServerForwardToLeader() {
824 LOG.info("testRemoveServerForwardToLeader starting");
826 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
827 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
829 ActorRef leaderActor = actorFactory.createTestActor(
830 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
832 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
833 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
834 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
835 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
836 actorFactory.generateActorId(FOLLOWER_ID));
837 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
839 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
840 -1, -1, (short)0), leaderActor);
842 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
843 expectFirstMatching(leaderActor, RemoveServer.class);
845 LOG.info("testRemoveServerForwardToLeader ending");
849 public void testRemoveServer() {
850 LOG.info("testRemoveServer starting");
852 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
853 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
854 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
856 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
857 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
858 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
859 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
860 RaftActorContext initialActorContext = new MockRaftActorContext();
862 final String downNodeId = "downNode";
863 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
864 ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
865 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
866 actorFactory.generateActorId(LEADER_ID));
868 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
870 ActorRef follower1Collector = actorFactory.createActor(
871 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
872 final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
873 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
874 FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
875 follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
877 ActorRef follower2Collector = actorFactory.createActor(
878 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
879 final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
880 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
881 FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
882 follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
884 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
885 follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
886 follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
888 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
889 RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
890 RemoveServerReply.class);
891 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
893 ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
894 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
895 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
896 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
898 applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
899 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
900 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
901 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
903 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
904 assertTrue("Expected Leader", currentBehavior instanceof Leader);
905 assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
907 MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
909 LOG.info("testRemoveServer ending");
913 public void testRemoveServerLeader() {
914 LOG.info("testRemoveServerLeader starting");
916 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
917 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
918 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
920 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
921 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
922 RaftActorContext initialActorContext = new MockRaftActorContext();
924 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
925 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
926 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
927 actorFactory.generateActorId(LEADER_ID));
929 final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
931 final ActorRef followerCollector =
932 actorFactory.createActor(MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
933 actorFactory.createTestActor(
934 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
935 configParams, NO_PERSISTENCE, followerCollector)
936 .withDispatcher(Dispatchers.DefaultDispatcherId()),
939 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
940 RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
941 RemoveServerReply.class);
942 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
944 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
945 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
946 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
947 votingServer(FOLLOWER_ID));
949 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
951 LOG.info("testRemoveServerLeader ending");
955 public void testRemoveServerLeaderWithNoFollowers() {
956 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
958 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
959 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
960 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
961 actorFactory.generateActorId(LEADER_ID));
963 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
964 RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
965 RemoveServerReply.class);
966 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
968 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
972 public void testChangeServersVotingStatus() {
973 LOG.info("testChangeServersVotingStatus starting");
975 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
976 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
977 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
979 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
980 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
981 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
982 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
984 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
985 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
986 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
987 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
988 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
990 ActorRef follower1Collector = actorFactory.createActor(
991 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
992 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
993 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
994 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
995 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
997 ActorRef follower2Collector = actorFactory.createActor(
998 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
999 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1000 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1001 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1002 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1004 // Send first ChangeServersVotingStatus message
1006 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
1008 ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1009 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1011 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1012 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1013 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1014 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1016 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1017 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1018 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1019 nonVotingServer(FOLLOWER_ID2));
1021 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1022 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1023 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1024 nonVotingServer(FOLLOWER_ID2));
1026 MessageCollectorActor.clearMessages(leaderCollector);
1027 MessageCollectorActor.clearMessages(follower1Collector);
1028 MessageCollectorActor.clearMessages(follower2Collector);
1030 // Send second ChangeServersVotingStatus message
1032 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1033 reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1034 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1036 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1037 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1038 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1040 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1041 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1042 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1044 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1045 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1046 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1048 LOG.info("testChangeServersVotingStatus ending");
1052 public void testChangeLeaderToNonVoting() {
1053 LOG.info("testChangeLeaderToNonVoting starting");
1055 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1056 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1058 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1059 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1060 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1061 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1063 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1064 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1065 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1066 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1067 ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1069 ActorRef follower1Collector = actorFactory.createActor(
1070 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1071 final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1072 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1073 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1074 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1076 ActorRef follower2Collector = actorFactory.createActor(
1077 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1078 final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1079 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1080 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1081 .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1083 // Send ChangeServersVotingStatus message
1085 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1086 ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1087 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1089 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1090 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1091 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1093 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1094 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1095 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1097 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1098 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1099 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1101 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1102 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1104 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1106 LOG.info("testChangeLeaderToNonVoting ending");
1110 public void testChangeLeaderToNonVotingInSingleNode() {
1111 LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1113 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1114 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1115 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1117 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1118 ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1119 assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1121 LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1125 public void testChangeToVotingWithNoLeader() {
1126 LOG.info("testChangeToVotingWithNoLeader starting");
1128 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1129 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1130 configParams.setElectionTimeoutFactor(5);
1132 final String node1ID = "node1";
1133 final String node2ID = "node2";
1135 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1136 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1139 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1140 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1141 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1142 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1144 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1145 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1146 InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1147 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1148 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1149 InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1151 ActorRef node1Collector = actorFactory.createActor(
1152 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1153 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1154 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1155 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1156 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1158 ActorRef node2Collector = actorFactory.createActor(
1159 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1160 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1161 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1162 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1163 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1165 node1RaftActor.waitForInitializeBehaviorComplete();
1166 node2RaftActor.waitForInitializeBehaviorComplete();
1168 // Verify the intended server config was loaded and applied.
1169 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1170 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1171 votingServer("downNode2"));
1172 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1173 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1174 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1176 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1177 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1178 votingServer("downNode2"));
1179 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1181 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1182 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1183 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1184 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1186 // First send the message such that node1 has no peer address for node2 - should fail.
1188 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1189 node2ID, true, "downNode1", false, "downNode2", false));
1190 node1RaftActorRef.tell(changeServers, testKit.getRef());
1191 ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1192 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1193 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1195 // Send an AppendEntries so node1 has a leaderId
1197 long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1198 node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1199 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1201 // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
1202 // ChangeServersVotingStatus message, it will try to elect a leader.
1204 AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
1205 rs -> assertEquals("getLeader", null, rs.getLeader()));
1207 // Update node2's peer address and send the message again
1209 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1211 node1RaftActorRef.tell(changeServers, testKit.getRef());
1212 reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1213 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1215 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1216 ApplyJournalEntries.class);
1217 assertEquals("getToIndex", 1, apply.getToIndex());
1218 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1219 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1220 nonVotingServer("downNode2"));
1221 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1222 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1224 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1225 assertEquals("getToIndex", 1, apply.getToIndex());
1226 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1227 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1228 nonVotingServer("downNode2"));
1229 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1230 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1232 LOG.info("testChangeToVotingWithNoLeader ending");
1236 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1237 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1239 final String node1ID = "node1";
1240 final String node2ID = "node2";
1242 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1243 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1244 ? actorFactory.createTestActorPath(node2ID) : null;
1246 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1247 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1248 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1250 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1251 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1252 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1253 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1255 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1256 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1257 configParams1.setElectionTimeoutFactor(1);
1258 configParams1.setPeerAddressResolver(peerAddressResolver);
1259 ActorRef node1Collector = actorFactory.createActor(
1260 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1261 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1262 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1263 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1264 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1266 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1267 configParams2.setElectionTimeoutFactor(1000000);
1268 configParams2.setPeerAddressResolver(peerAddressResolver);
1269 ActorRef node2Collector = actorFactory.createActor(
1270 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1271 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1272 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1273 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1274 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1276 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1277 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1278 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1279 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1280 // node2 was previously voting.
1282 node2RaftActor.setDropMessageOfType(RequestVote.class);
1284 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1285 node1RaftActorRef.tell(changeServers, testKit.getRef());
1286 ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1287 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1289 assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1290 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1291 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1293 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1297 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1298 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1300 final String node1ID = "node1";
1301 final String node2ID = "node2";
1303 final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1304 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1305 ? actorFactory.createTestActorPath(node2ID) : null;
1307 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1308 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1309 configParams.setElectionTimeoutFactor(3);
1310 configParams.setPeerAddressResolver(peerAddressResolver);
1312 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1313 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1314 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1316 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1317 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1318 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1319 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1320 InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1321 new MockRaftActorContext.MockPayload("2")));
1322 InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1324 ActorRef node1Collector = actorFactory.createActor(
1325 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1326 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1327 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1328 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1329 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1331 ActorRef node2Collector = actorFactory.createActor(
1332 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1333 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1334 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1335 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1336 final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1338 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1339 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1340 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1341 // forward the request to node2.
1343 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1344 ImmutableMap.of(node1ID, true, node2ID, true));
1345 node1RaftActorRef.tell(changeServers, testKit.getRef());
1346 ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1347 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1349 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1350 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1351 votingServer(node1ID), votingServer(node2ID));
1352 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1354 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1355 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1356 votingServer(node1ID), votingServer(node2ID));
1357 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1358 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1360 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1364 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1365 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1367 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1368 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1369 configParams.setElectionTimeoutFactor(100000);
1371 final String node1ID = "node1";
1372 final String node2ID = "node2";
1374 configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1375 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1376 ? actorFactory.createTestActorPath(node2ID) : null);
1378 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1379 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1380 SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1382 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1383 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1384 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1385 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1387 ActorRef node1Collector = actorFactory.createActor(
1388 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1389 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1390 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1391 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1392 final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1394 ActorRef node2Collector = actorFactory.createActor(
1395 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1396 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1397 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1398 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1399 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1401 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1402 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1403 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1404 // request to node2 when node2 is elected.
1406 node2RaftActor.setDropMessageOfType(RequestVote.class);
1408 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1410 node1RaftActorRef.tell(changeServers, testKit.getRef());
1412 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1414 node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1416 ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1417 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1419 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1420 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1421 votingServer(node1ID), votingServer(node2ID));
1422 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1423 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1425 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1426 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1427 votingServer(node1ID), votingServer(node2ID));
1428 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1430 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1433 private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1434 Stopwatch sw = Stopwatch.createStarted();
1435 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1436 for (RaftActor raftActor : raftActors) {
1437 if (raftActor.getRaftState() == expState) {
1443 fail("None of the RaftActors have state " + expState);
1446 private static ServerInfo votingServer(String id) {
1447 return new ServerInfo(id, true);
1450 private static ServerInfo nonVotingServer(String id) {
1451 return new ServerInfo(id, false);
1454 private ActorRef newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1455 return newCollectorActor(leaderRaftActor, LEADER_ID);
1458 private ActorRef newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1459 ActorRef collectorActor = actorFactory.createTestActor(
1460 MessageCollectorActor.props(), actorFactory.generateActorId(id + "Collector"));
1461 raftActor.setCollectorActor(collectorActor);
1462 return collectorActor;
1465 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1466 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1467 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1468 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1469 assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1472 private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends AbstractActor> actor) {
1473 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1474 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1475 configParams.setElectionTimeoutFactor(100000);
1476 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
1477 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1478 termInfo.update(1, LEADER_ID);
1479 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1480 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1481 noPersistence, applyState -> actor.tell(applyState, actor), LOG);
1484 abstract static class AbstractMockRaftActor extends MockRaftActor {
1485 private volatile ActorRef collectorActor;
1486 private volatile Class<?> dropMessageOfType;
1488 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1489 boolean persistent, ActorRef collectorActor) {
1490 super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1491 .persistent(Optional.of(persistent)));
1492 this.collectorActor = collectorActor;
1495 void setDropMessageOfType(Class<?> dropMessageOfType) {
1496 this.dropMessageOfType = dropMessageOfType;
1499 void setCollectorActor(ActorRef collectorActor) {
1500 this.collectorActor = collectorActor;
1504 public void handleCommand(Object message) {
1505 if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1506 super.handleCommand(message);
1509 if (collectorActor != null) {
1510 collectorActor.tell(message, getSender());
1515 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1517 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1518 boolean persistent, ActorRef collectorActor) {
1519 super(id, peerAddresses, config, persistent, collectorActor);
1520 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1522 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1523 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1527 public void applySnapshot(
1528 org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1532 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1533 ByteSource snapshotBytes) {
1534 throw new UnsupportedOperationException();
1539 public static Props props(final String id, final Map<String, String> peerAddresses,
1540 ConfigParams config, boolean persistent, ActorRef collectorActor) {
1542 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1543 persistent, collectorActor);
1548 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1549 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1550 RaftActorContext fromContext) {
1551 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1552 setPersistence(false);
1554 RaftActorContext context = getRaftActorContext();
1555 for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1556 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1557 getState().add(entry.getData());
1558 context.getReplicatedLog().append(entry);
1561 context.setCommitIndex(fromContext.getCommitIndex());
1562 context.setLastApplied(fromContext.getLastApplied());
1563 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1564 fromContext.getTermInformation().getVotedFor());
1568 protected void initializeBehavior() {
1569 changeCurrentBehavior(new Leader(getRaftActorContext()));
1570 initializeBehaviorComplete.countDown();
1574 @SuppressWarnings("checkstyle:IllegalCatch")
1575 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1576 MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
1577 if (installSnapshotStream.isPresent()) {
1578 SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
1581 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1584 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1585 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1586 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1587 configParams.setElectionTimeoutFactor(10);
1588 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1592 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1593 public MockNewFollowerRaftActor(ConfigParams config, ActorRef collectorActor) {
1594 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
1596 setPersistence(false);
1599 static Props props(ConfigParams config, ActorRef collectorActor) {
1600 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);