2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
40 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
41 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
42 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
44 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
45 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
46 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
47 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
48 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
49 import org.opendaylight.controller.cluster.raft.messages.AddServer;
50 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
51 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
52 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
53 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
55 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
56 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
57 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
58 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
59 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
60 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
62 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
63 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
66 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
67 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.concurrent.duration.FiniteDuration;
73 * Unit tests for RaftActorServerConfigurationSupport.
75 * @author Thomas Pantelis
77 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
78 static final String LEADER_ID = "leader";
79 static final String FOLLOWER_ID = "follower";
80 static final String FOLLOWER_ID2 = "follower2";
81 static final String NEW_SERVER_ID = "new-server";
82 static final String NEW_SERVER_ID2 = "new-server2";
83 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
84 private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
85 private static final boolean NO_PERSISTENCE = false;
86 private static final boolean PERSISTENT = true;
88 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
90 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
92 actorFactory.generateActorId(FOLLOWER_ID));
94 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
95 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
96 private RaftActorContext newFollowerActorContext;
98 private final JavaTestKit testKit = new JavaTestKit(getSystem());
101 public void setup() {
102 InMemoryJournal.clear();
103 InMemorySnapshotStore.clear();
106 private void setupNewFollower() {
107 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
109 newFollowerCollectorActor = actorFactory.createTestActor(
110 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
111 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
112 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
113 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
114 actorFactory.generateActorId(NEW_SERVER_ID));
117 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
118 } catch (Exception e) {
119 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
123 private static DefaultConfigParamsImpl newFollowerConfigParams() {
124 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
125 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
126 configParams.setElectionTimeoutFactor(100000);
127 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
132 public void tearDown() throws Exception {
133 actorFactory.close();
137 public void testAddServerWithExistingFollower() throws Exception {
138 LOG.info("testAddServerWithExistingFollower starting");
140 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
141 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
143 followerActorContext.setCommitIndex(2);
144 followerActorContext.setLastApplied(2);
146 Follower follower = new Follower(followerActorContext);
147 followerActor.underlyingActor().setBehavior(follower);
148 followerActorContext.setCurrentBehavior(follower);
150 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
151 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
152 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
153 actorFactory.generateActorId(LEADER_ID));
155 // Expect initial heartbeat from the leader.
156 expectFirstMatching(followerActor, AppendEntries.class);
157 clearMessages(followerActor);
159 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
160 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
162 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
164 // Leader should install snapshot - capture and verify ApplySnapshot contents
166 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
167 @SuppressWarnings("unchecked")
168 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
169 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
171 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
172 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
173 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
175 // Verify ServerConfigurationPayload entry in leader's log
177 expectFirstMatching(leaderCollectorActor, ApplyState.class);
178 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
179 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
180 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
181 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
182 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
183 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
185 // Verify ServerConfigurationPayload entry in both followers
187 expectFirstMatching(followerActor, ApplyState.class);
188 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
189 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
190 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
192 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
193 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
194 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
195 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
197 // Verify new server config was applied in both followers
199 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
201 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
203 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
204 expectFirstMatching(followerActor, ApplyState.class);
206 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
207 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
208 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
209 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
211 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
212 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
213 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
214 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
215 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
216 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
218 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
219 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
220 logEntry = persistedLogEntries.get(0);
221 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
222 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
223 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
224 logEntry.getData().getClass());
226 LOG.info("testAddServerWithExistingFollower ending");
230 public void testAddServerWithNoExistingFollower() throws Exception {
231 LOG.info("testAddServerWithNoExistingFollower starting");
234 RaftActorContext initialActorContext = new MockRaftActorContext();
235 initialActorContext.setCommitIndex(1);
236 initialActorContext.setLastApplied(1);
237 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
240 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
241 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
242 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
243 actorFactory.generateActorId(LEADER_ID));
245 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
246 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
248 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
250 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
252 // Leader should install snapshot - capture and verify ApplySnapshot contents
254 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
255 @SuppressWarnings("unchecked")
256 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
257 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
259 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
260 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
261 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
263 // Verify ServerConfigurationPayload entry in leader's log
265 expectFirstMatching(leaderCollectorActor, ApplyState.class);
266 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
267 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
268 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
269 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
270 votingServer(NEW_SERVER_ID));
272 // Verify ServerConfigurationPayload entry in the new follower
274 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
275 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
276 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
277 votingServer(NEW_SERVER_ID));
279 // Verify new server config was applied in the new follower
281 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
283 LOG.info("testAddServerWithNoExistingFollower ending");
287 public void testAddServersAsNonVoting() throws Exception {
288 LOG.info("testAddServersAsNonVoting starting");
291 RaftActorContext initialActorContext = new MockRaftActorContext();
293 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
294 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
295 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
296 actorFactory.generateActorId(LEADER_ID));
298 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
299 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
301 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
303 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
305 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
306 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
307 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
309 // Verify ServerConfigurationPayload entry in leader's log
311 expectFirstMatching(leaderCollectorActor, ApplyState.class);
313 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
314 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
315 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
316 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
317 nonVotingServer(NEW_SERVER_ID));
319 // Verify ServerConfigurationPayload entry in the new follower
321 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
322 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
323 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
324 nonVotingServer(NEW_SERVER_ID));
326 // Verify new server config was applied in the new follower
328 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
330 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
332 // Add another non-voting server.
334 clearMessages(leaderCollectorActor);
336 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
337 Follower newFollower2 = new Follower(follower2ActorContext);
338 followerActor.underlyingActor().setBehavior(newFollower2);
340 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
342 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
343 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
344 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
346 expectFirstMatching(leaderCollectorActor, ApplyState.class);
347 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
348 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
349 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
350 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
351 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
353 LOG.info("testAddServersAsNonVoting ending");
357 public void testAddServerWithOperationInProgress() throws Exception {
358 LOG.info("testAddServerWithOperationInProgress starting");
361 RaftActorContext initialActorContext = new MockRaftActorContext();
363 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
364 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
365 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
366 actorFactory.generateActorId(LEADER_ID));
368 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
369 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
371 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
373 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
374 Follower newFollower2 = new Follower(follower2ActorContext);
375 followerActor.underlyingActor().setBehavior(newFollower2);
377 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
378 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
380 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
382 // Wait for leader's install snapshot and capture it
384 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
386 // Send a second AddServer - should get queued
387 JavaTestKit testKit2 = new JavaTestKit(getSystem());
388 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
390 // Continue the first AddServer
391 newFollowerRaftActorInstance.setDropMessageOfType(null);
392 newFollowerRaftActor.tell(installSnapshot, leaderActor);
394 // Verify both complete successfully
395 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
396 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
398 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
399 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
401 // Verify ServerConfigurationPayload entries in leader's log
403 expectMatching(leaderCollectorActor, ApplyState.class, 2);
404 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
405 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
406 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
407 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
408 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
410 // Verify ServerConfigurationPayload entry in the new follower
412 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
413 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
414 newFollowerActorContext.getPeerIds());
416 LOG.info("testAddServerWithOperationInProgress ending");
420 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
421 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
424 RaftActorContext initialActorContext = new MockRaftActorContext();
426 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
427 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
428 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
429 actorFactory.generateActorId(LEADER_ID));
431 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
432 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
434 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
436 // Drop commit message for now to delay snapshot completion
437 leaderRaftActor.setDropMessageOfType(String.class);
439 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
441 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
443 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
445 leaderRaftActor.setDropMessageOfType(null);
446 leaderActor.tell(commitMsg, leaderActor);
448 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
449 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
450 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
452 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
454 // Verify ServerConfigurationPayload entry in leader's log
456 expectFirstMatching(leaderCollectorActor, ApplyState.class);
457 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
458 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
459 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
460 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
461 votingServer(NEW_SERVER_ID));
463 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
467 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
468 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
471 RaftActorContext initialActorContext = new MockRaftActorContext();
473 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
474 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
475 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
476 actorFactory.generateActorId(LEADER_ID));
478 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
479 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
481 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
483 // Drop commit message so the snapshot doesn't complete.
484 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
486 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
488 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
490 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
491 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
493 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
495 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
499 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
500 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
503 RaftActorContext initialActorContext = new MockRaftActorContext();
505 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
506 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
507 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
508 actorFactory.generateActorId(LEADER_ID));
510 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
511 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
512 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
514 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
516 // Drop the commit message so the snapshot doesn't complete yet.
517 leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
519 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
521 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
523 Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
525 // Change the leader behavior to follower
526 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
528 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
529 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
530 // isCapturing assertion below.
531 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
533 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
534 leaderActor.tell(commitMsg, leaderActor);
536 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
538 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
539 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
541 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
542 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
544 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
548 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
549 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
552 RaftActorContext initialActorContext = new MockRaftActorContext();
554 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
555 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
556 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
557 actorFactory.generateActorId(LEADER_ID));
559 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
560 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
562 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
564 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
566 // Drop the UnInitializedFollowerSnapshotReply to delay it.
567 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
569 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
571 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
572 UnInitializedFollowerSnapshotReply.class);
574 // Prevent election timeout when the leader switches to follower
575 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
577 // Change the leader behavior to follower
578 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
580 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
581 leaderRaftActor.setDropMessageOfType(null);
582 leaderActor.tell(snapshotReply, leaderActor);
584 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
585 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
587 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
589 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
593 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
594 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
597 RaftActorContext initialActorContext = new MockRaftActorContext();
599 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
600 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
601 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
602 actorFactory.generateActorId(LEADER_ID));
604 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
605 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
606 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
608 // Drop the InstallSnapshot message so it times out
609 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
611 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
613 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
615 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
616 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
618 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
619 assertEquals("Leader followers size", 0,
620 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
622 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
626 public void testAddServerWithNoLeader() {
627 LOG.info("testAddServerWithNoLeader starting");
630 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
631 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
633 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
634 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
635 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
636 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
637 actorFactory.generateActorId(LEADER_ID));
638 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
640 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
641 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
642 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
644 LOG.info("testAddServerWithNoLeader ending");
648 public void testAddServerWithNoConsensusReached() {
649 LOG.info("testAddServerWithNoConsensusReached starting");
652 RaftActorContext initialActorContext = new MockRaftActorContext();
654 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
655 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
656 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
657 actorFactory.generateActorId(LEADER_ID));
659 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
660 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
662 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
664 // Drop UnInitializedFollowerSnapshotReply initially
665 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
667 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
668 TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
669 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
671 // Drop AppendEntries to the new follower so consensus isn't reached
672 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
674 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
676 // Capture the UnInitializedFollowerSnapshotReply
677 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
679 // Send the UnInitializedFollowerSnapshotReply to resume the first request
680 leaderRaftActor.setDropMessageOfType(null);
681 leaderActor.tell(snapshotReply, leaderActor);
683 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
685 // Send a second AddServer
686 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
688 // The first AddServer should succeed with OK even though consensus wasn't reached
689 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
690 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
691 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
693 // Verify ServerConfigurationPayload entry in leader's log
694 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
695 votingServer(NEW_SERVER_ID));
697 // The second AddServer should fail since consensus wasn't reached for the first
698 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
699 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
701 // Re-send the second AddServer - should also fail
702 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
703 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
704 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
706 LOG.info("testAddServerWithNoConsensusReached ending");
710 public void testAddServerWithExistingServer() {
711 LOG.info("testAddServerWithExistingServer starting");
713 RaftActorContext initialActorContext = new MockRaftActorContext();
715 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
716 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
717 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
718 actorFactory.generateActorId(LEADER_ID));
720 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
722 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
723 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
725 LOG.info("testAddServerWithExistingServer ending");
729 public void testAddServerForwardedToLeader() {
730 LOG.info("testAddServerForwardedToLeader starting");
733 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
734 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
736 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
737 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
738 actorFactory.generateActorId(LEADER_ID));
740 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
741 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
742 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
743 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
744 actorFactory.generateActorId(FOLLOWER_ID));
745 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
747 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
748 -1, -1, (short)0), leaderActor);
750 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
751 expectFirstMatching(leaderActor, AddServer.class);
753 LOG.info("testAddServerForwardedToLeader ending");
757 public void testOnApplyState() {
758 LOG.info("testOnApplyState starting");
760 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
761 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
762 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
763 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
764 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
765 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
766 actorFactory.generateActorId(LEADER_ID));
768 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
770 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
771 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
772 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
773 assertEquals("Message handled", true, handled);
775 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
776 new MockRaftActorContext.MockPayload("1"));
777 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
778 assertEquals("Message handled", false, handled);
780 LOG.info("testOnApplyState ending");
784 public void testRemoveServerWithNoLeader() {
785 LOG.info("testRemoveServerWithNoLeader starting");
787 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
788 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
790 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
791 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
792 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
793 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
794 actorFactory.generateActorId(LEADER_ID));
795 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
797 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
798 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
799 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
801 LOG.info("testRemoveServerWithNoLeader ending");
805 public void testRemoveServerNonExistentServer() {
806 LOG.info("testRemoveServerNonExistentServer starting");
808 RaftActorContext initialActorContext = new MockRaftActorContext();
810 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
811 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
812 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
813 actorFactory.generateActorId(LEADER_ID));
815 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
816 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
817 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
819 LOG.info("testRemoveServerNonExistentServer ending");
823 public void testRemoveServerForwardToLeader() {
824 LOG.info("testRemoveServerForwardToLeader starting");
826 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
827 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
829 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
830 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
831 actorFactory.generateActorId(LEADER_ID));
833 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
834 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
835 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
836 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
837 actorFactory.generateActorId(FOLLOWER_ID));
838 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
840 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
841 -1, -1, (short)0), leaderActor);
843 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
844 expectFirstMatching(leaderActor, RemoveServer.class);
846 LOG.info("testRemoveServerForwardToLeader ending");
850 public void testRemoveServer() {
851 LOG.info("testRemoveServer starting");
853 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
854 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
855 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
857 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
858 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
859 RaftActorContext initialActorContext = new MockRaftActorContext();
861 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
862 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
863 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
864 actorFactory.generateActorId(LEADER_ID));
866 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
868 TestActorRef<MessageCollectorActor> collector =
869 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
870 actorFactory.generateActorId("collector"));
871 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
872 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
873 configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
876 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
877 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
878 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
880 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
881 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
882 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
884 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
885 assertTrue("Expected Leader", currentBehavior instanceof Leader);
886 assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
888 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
890 LOG.info("testRemoveServer ending");
894 public void testRemoveServerLeader() {
895 LOG.info("testRemoveServerLeader starting");
897 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
898 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
899 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
901 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
902 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
903 RaftActorContext initialActorContext = new MockRaftActorContext();
905 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
906 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
907 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
908 actorFactory.generateActorId(LEADER_ID));
910 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
912 TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
913 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
914 actorFactory.createTestActor(
915 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
916 configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
919 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
920 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
921 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
923 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
924 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
925 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
926 votingServer(FOLLOWER_ID));
928 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
930 LOG.info("testRemoveServerLeader ending");
934 public void testRemoveServerLeaderWithNoFollowers() {
935 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
937 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
938 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
939 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
940 actorFactory.generateActorId(LEADER_ID));
942 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
943 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
944 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
946 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
950 public void testChangeServersVotingStatus() {
951 LOG.info("testChangeServersVotingStatus starting");
953 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
954 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
955 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
957 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
958 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
959 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
960 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
962 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
963 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
964 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
965 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
966 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
968 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
969 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
970 actorFactory.generateActorId("collector"));
971 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
972 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
973 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
974 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
976 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
977 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
978 actorFactory.generateActorId("collector"));
979 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
980 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
981 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
982 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
984 // Send first ChangeServersVotingStatus message
986 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
988 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
989 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
991 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
992 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
993 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
994 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
996 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
997 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
998 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1000 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1001 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1002 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1004 MessageCollectorActor.clearMessages(leaderCollector);
1005 MessageCollectorActor.clearMessages(follower1Collector);
1006 MessageCollectorActor.clearMessages(follower2Collector);
1008 // Send second ChangeServersVotingStatus message
1010 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1011 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1012 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1014 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1015 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1016 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1018 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1019 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1020 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1022 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1023 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1024 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1026 LOG.info("testChangeServersVotingStatus ending");
1030 public void testChangeLeaderToNonVoting() {
1031 LOG.info("testChangeLeaderToNonVoting starting");
1033 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1034 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1036 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1037 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1038 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1039 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1041 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1042 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1043 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1044 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1045 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1047 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1048 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1049 actorFactory.generateActorId("collector"));
1050 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1051 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1052 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1053 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1055 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1056 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1057 actorFactory.generateActorId("collector"));
1058 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1059 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1060 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1061 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1063 // Send ChangeServersVotingStatus message
1065 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1066 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1067 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1069 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1070 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1071 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1073 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1074 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1075 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1077 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1078 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1079 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1081 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1082 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1084 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1086 LOG.info("testChangeLeaderToNonVoting ending");
1090 public void testChangeLeaderToNonVotingInSingleNode() {
1091 LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1093 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1094 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
1095 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1097 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1098 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1099 assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1101 LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1105 public void testChangeToVotingWithNoLeader() {
1106 LOG.info("testChangeToVotingWithNoLeader starting");
1108 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1109 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1110 configParams.setElectionTimeoutFactor(5);
1112 final String node1ID = "node1";
1113 final String node2ID = "node2";
1115 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1116 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1119 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1120 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1121 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1122 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1124 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1125 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1126 InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1127 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1128 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1129 InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1131 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1132 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1133 actorFactory.generateActorId("collector"));
1134 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1135 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1136 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1137 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1139 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1140 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1141 actorFactory.generateActorId("collector"));
1142 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1143 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1144 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1145 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1147 node1RaftActor.waitForInitializeBehaviorComplete();
1148 node2RaftActor.waitForInitializeBehaviorComplete();
1150 // Verify the intended server config was loaded and applied.
1151 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1152 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1153 votingServer("downNode2"));
1154 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1155 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1156 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1158 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1159 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1160 votingServer("downNode2"));
1161 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1163 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1164 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1165 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1166 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1168 // First send the message such that node1 has no peer address for node2 - should fail.
1170 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1171 node2ID, true, "downNode1", false, "downNode2", false));
1172 node1RaftActorRef.tell(changeServers, testKit.getRef());
1173 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1174 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1176 // Send an AppendEntries so node1 has a leaderId
1178 MessageCollectorActor.clearMessages(node1Collector);
1180 long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1181 node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1182 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1184 // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
1185 // ChangeServersVotingStatus message, it will try to elect a leader.
1187 MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
1189 // Update node2's peer address and send the message again
1191 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1193 node1RaftActorRef.tell(changeServers, testKit.getRef());
1194 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1195 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1197 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1198 assertEquals("getToIndex", 1, apply.getToIndex());
1199 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1200 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1201 nonVotingServer("downNode2"));
1202 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1203 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1205 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1206 assertEquals("getToIndex", 1, apply.getToIndex());
1207 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1208 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1209 nonVotingServer("downNode2"));
1210 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1211 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1213 LOG.info("testChangeToVotingWithNoLeader ending");
1217 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1218 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1220 final String node1ID = "node1";
1221 final String node2ID = "node2";
1223 PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1224 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1226 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1227 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1228 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1230 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1231 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1232 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1233 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1235 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1236 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1237 configParams1.setElectionTimeoutFactor(1);
1238 configParams1.setPeerAddressResolver(peerAddressResolver);
1239 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1240 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1241 actorFactory.generateActorId("collector"));
1242 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1243 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1244 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1245 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1247 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1248 configParams2.setElectionTimeoutFactor(1000000);
1249 configParams2.setPeerAddressResolver(peerAddressResolver);
1250 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1251 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1252 actorFactory.generateActorId("collector"));
1253 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1254 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1255 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1256 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1258 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1259 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1260 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1261 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1262 // node2 was previously voting.
1264 node2RaftActor.setDropMessageOfType(RequestVote.class);
1266 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1267 node1RaftActorRef.tell(changeServers, testKit.getRef());
1268 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1269 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1271 assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1272 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1273 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1275 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1279 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1280 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1282 final String node1ID = "node1";
1283 final String node2ID = "node2";
1285 PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1286 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1288 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1289 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1290 configParams.setElectionTimeoutFactor(3);
1291 configParams.setPeerAddressResolver(peerAddressResolver);
1293 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1294 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1295 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1297 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1298 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1299 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1300 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1301 InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1302 new MockRaftActorContext.MockPayload("2")));
1303 InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1305 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1306 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1307 actorFactory.generateActorId("collector"));
1308 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1309 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1310 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1311 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1313 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1314 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1315 actorFactory.generateActorId("collector"));
1316 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1317 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1318 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1319 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1321 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1322 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1323 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1324 // forward the request to node2.
1326 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1327 ImmutableMap.of(node1ID, true, node2ID, true));
1328 node1RaftActorRef.tell(changeServers, testKit.getRef());
1329 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1330 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1332 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1333 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1334 votingServer(node1ID), votingServer(node2ID));
1335 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1337 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1338 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1339 votingServer(node1ID), votingServer(node2ID));
1340 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1341 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1343 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1347 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1348 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1350 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1351 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1352 configParams.setElectionTimeoutFactor(100000);
1354 final String node1ID = "node1";
1355 final String node2ID = "node2";
1357 configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1358 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
1360 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1361 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1362 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1364 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1365 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1366 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1367 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1369 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1370 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1371 actorFactory.generateActorId("collector"));
1372 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1373 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1374 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1375 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1377 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1378 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1379 actorFactory.generateActorId("collector"));
1380 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1381 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1382 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1383 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1385 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1386 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1387 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1388 // request to node2 when node2 is elected.
1390 node2RaftActor.setDropMessageOfType(RequestVote.class);
1392 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1394 node1RaftActorRef.tell(changeServers, testKit.getRef());
1396 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1398 node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1400 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1401 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1403 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1404 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1405 votingServer(node1ID), votingServer(node2ID));
1406 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1407 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1409 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1410 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1411 votingServer(node1ID), votingServer(node2ID));
1412 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1414 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1417 private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1418 Stopwatch sw = Stopwatch.createStarted();
1419 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1420 for(RaftActor raftActor: raftActors) {
1421 if(raftActor.getRaftState() == expState) {
1427 fail("None of the RaftActors have state " + expState);
1430 private static ServerInfo votingServer(String id) {
1431 return new ServerInfo(id, true);
1434 private static ServerInfo nonVotingServer(String id) {
1435 return new ServerInfo(id, false);
1438 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1439 return newCollectorActor(leaderRaftActor, LEADER_ID);
1442 private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1443 TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1444 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1445 actorFactory.generateActorId(id + "Collector"));
1446 raftActor.setCollectorActor(collectorActor);
1447 return collectorActor;
1450 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1451 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1452 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1453 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1454 assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1457 private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1458 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1459 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1460 configParams.setElectionTimeoutFactor(100000);
1461 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1462 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1463 termInfo.update(1, LEADER_ID);
1464 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1465 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1468 static abstract class AbstractMockRaftActor extends MockRaftActor {
1469 private volatile TestActorRef<MessageCollectorActor> collectorActor;
1470 private volatile Class<?> dropMessageOfType;
1472 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1473 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1474 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1475 persistent(Optional.of(persistent)));
1476 this.collectorActor = collectorActor;
1479 void setDropMessageOfType(Class<?> dropMessageOfType) {
1480 this.dropMessageOfType = dropMessageOfType;
1483 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1484 this.collectorActor = collectorActor;
1488 public void handleCommand(Object message) {
1489 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1490 super.handleCommand(message);
1493 if(collectorActor != null) {
1494 collectorActor.tell(message, getSender());
1499 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1501 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1502 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1503 super(id, peerAddresses, config, persistent, collectorActor);
1504 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1506 public void createSnapshot(ActorRef actorRef) {
1507 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1511 public void applySnapshot(byte[] snapshotBytes) {
1516 public static Props props(final String id, final Map<String, String> peerAddresses,
1517 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1519 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1520 persistent, collectorActor);
1525 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1526 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1527 RaftActorContext fromContext) {
1528 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1529 setPersistence(false);
1531 RaftActorContext context = getRaftActorContext();
1532 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1533 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1534 getState().add(entry.getData());
1535 context.getReplicatedLog().append(entry);
1538 context.setCommitIndex(fromContext.getCommitIndex());
1539 context.setLastApplied(fromContext.getLastApplied());
1540 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1541 fromContext.getTermInformation().getVotedFor());
1545 protected void initializeBehavior() {
1546 changeCurrentBehavior(new Leader(getRaftActorContext()));
1547 initializeBehaviorComplete.countDown();
1551 public void createSnapshot(ActorRef actorRef) {
1553 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1554 } catch (Exception e) {
1555 LOG.error("createSnapshot failed", e);
1559 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1560 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1561 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1562 configParams.setElectionTimeoutFactor(10);
1563 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1567 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1568 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1569 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1570 setPersistence(false);
1573 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1574 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);