2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
43 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
45 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
46 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
47 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
48 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
49 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
50 import org.opendaylight.controller.cluster.raft.messages.AddServer;
51 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
52 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
53 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
54 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
56 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
57 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
58 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
59 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
60 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
61 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.duration.FiniteDuration;
72 * Unit tests for RaftActorServerConfigurationSupport.
74 * @author Thomas Pantelis
76 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
77 static final String LEADER_ID = "leader";
78 static final String FOLLOWER_ID = "follower";
79 static final String FOLLOWER_ID2 = "follower2";
80 static final String NEW_SERVER_ID = "new-server";
81 static final String NEW_SERVER_ID2 = "new-server2";
82 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
83 private static final boolean NO_PERSISTENCE = false;
84 private static final boolean PERSISTENT = true;
86 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
88 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
89 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
90 actorFactory.generateActorId(FOLLOWER_ID));
92 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
93 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
94 private RaftActorContext newFollowerActorContext;
96 private final JavaTestKit testKit = new JavaTestKit(getSystem());
100 InMemoryJournal.clear();
101 InMemorySnapshotStore.clear();
104 private void setupNewFollower() {
105 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
107 newFollowerCollectorActor = actorFactory.createTestActor(
108 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
109 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
110 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
111 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
112 actorFactory.generateActorId(NEW_SERVER_ID));
115 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
116 } catch (Exception e) {
117 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
121 private static DefaultConfigParamsImpl newFollowerConfigParams() {
122 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
123 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
124 configParams.setElectionTimeoutFactor(100000);
125 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
130 public void tearDown() throws Exception {
131 actorFactory.close();
135 public void testAddServerWithExistingFollower() throws Exception {
136 LOG.info("testAddServerWithExistingFollower starting");
138 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
139 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
141 followerActorContext.setCommitIndex(2);
142 followerActorContext.setLastApplied(2);
144 Follower follower = new Follower(followerActorContext);
145 followerActor.underlyingActor().setBehavior(follower);
147 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
148 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
149 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
150 actorFactory.generateActorId(LEADER_ID));
152 // Expect initial heartbeat from the leader.
153 expectFirstMatching(followerActor, AppendEntries.class);
154 clearMessages(followerActor);
156 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
157 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
159 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
161 // Leader should install snapshot - capture and verify ApplySnapshot contents
163 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
164 @SuppressWarnings("unchecked")
165 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
166 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
168 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
169 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
170 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
172 // Verify ServerConfigurationPayload entry in leader's log
174 expectFirstMatching(leaderCollectorActor, ApplyState.class);
175 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
176 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
177 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
178 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
179 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
180 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
182 // Verify ServerConfigurationPayload entry in both followers
184 expectFirstMatching(followerActor, ApplyState.class);
185 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
186 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
187 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
189 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
190 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
191 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
192 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
194 // Verify new server config was applied in both followers
196 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
198 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
200 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
201 expectFirstMatching(followerActor, ApplyState.class);
203 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
204 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
205 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
206 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
208 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
209 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
210 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
211 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
212 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
213 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
215 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
216 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
217 logEntry = persistedLogEntries.get(0);
218 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
219 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
220 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
221 logEntry.getData().getClass());
223 LOG.info("testAddServerWithExistingFollower ending");
227 public void testAddServerWithNoExistingFollower() throws Exception {
228 LOG.info("testAddServerWithNoExistingFollower starting");
231 RaftActorContext initialActorContext = new MockRaftActorContext();
232 initialActorContext.setCommitIndex(1);
233 initialActorContext.setLastApplied(1);
234 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
237 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
238 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
239 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
240 actorFactory.generateActorId(LEADER_ID));
242 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
243 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
245 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
247 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
249 // Leader should install snapshot - capture and verify ApplySnapshot contents
251 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
252 @SuppressWarnings("unchecked")
253 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
254 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
256 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
257 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
258 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
260 // Verify ServerConfigurationPayload entry in leader's log
262 expectFirstMatching(leaderCollectorActor, ApplyState.class);
263 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
264 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
265 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
266 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
267 votingServer(NEW_SERVER_ID));
269 // Verify ServerConfigurationPayload entry in the new follower
271 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
272 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
273 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
274 votingServer(NEW_SERVER_ID));
276 // Verify new server config was applied in the new follower
278 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
280 LOG.info("testAddServerWithNoExistingFollower ending");
284 public void testAddServersAsNonVoting() throws Exception {
285 LOG.info("testAddServersAsNonVoting starting");
288 RaftActorContext initialActorContext = new MockRaftActorContext();
290 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
291 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
292 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
293 actorFactory.generateActorId(LEADER_ID));
295 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
296 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
298 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
300 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
302 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
303 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
304 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
306 // Verify ServerConfigurationPayload entry in leader's log
308 expectFirstMatching(leaderCollectorActor, ApplyState.class);
310 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
311 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
312 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
313 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
314 nonVotingServer(NEW_SERVER_ID));
316 // Verify ServerConfigurationPayload entry in the new follower
318 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
319 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
320 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
321 nonVotingServer(NEW_SERVER_ID));
323 // Verify new server config was applied in the new follower
325 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
327 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
329 // Add another non-voting server.
331 clearMessages(leaderCollectorActor);
333 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
334 Follower newFollower2 = new Follower(follower2ActorContext);
335 followerActor.underlyingActor().setBehavior(newFollower2);
337 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
339 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
340 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
341 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
343 expectFirstMatching(leaderCollectorActor, ApplyState.class);
344 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
345 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
346 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
347 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
348 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
350 LOG.info("testAddServersAsNonVoting ending");
354 public void testAddServerWithOperationInProgress() throws Exception {
355 LOG.info("testAddServerWithOperationInProgress starting");
358 RaftActorContext initialActorContext = new MockRaftActorContext();
360 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
361 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
362 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
363 actorFactory.generateActorId(LEADER_ID));
365 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
366 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
368 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
370 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
371 Follower newFollower2 = new Follower(follower2ActorContext);
372 followerActor.underlyingActor().setBehavior(newFollower2);
374 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
375 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
377 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
379 // Wait for leader's install snapshot and capture it
381 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
383 // Send a second AddServer - should get queued
384 JavaTestKit testKit2 = new JavaTestKit(getSystem());
385 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
387 // Continue the first AddServer
388 newFollowerRaftActorInstance.setDropMessageOfType(null);
389 newFollowerRaftActor.tell(installSnapshot, leaderActor);
391 // Verify both complete successfully
392 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
393 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
395 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
396 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
398 // Verify ServerConfigurationPayload entries in leader's log
400 expectMatching(leaderCollectorActor, ApplyState.class, 2);
401 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
402 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
403 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
404 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
405 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
407 // Verify ServerConfigurationPayload entry in the new follower
409 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
410 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
411 newFollowerActorContext.getPeerIds());
413 LOG.info("testAddServerWithOperationInProgress ending");
417 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
418 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
421 RaftActorContext initialActorContext = new MockRaftActorContext();
423 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
424 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
425 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
426 actorFactory.generateActorId(LEADER_ID));
428 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
429 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
431 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
433 // Drop commit message for now to delay snapshot completion
434 leaderRaftActor.setDropMessageOfType(String.class);
436 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
438 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
440 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
442 leaderRaftActor.setDropMessageOfType(null);
443 leaderActor.tell(commitMsg, leaderActor);
445 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
446 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
447 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
449 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
451 // Verify ServerConfigurationPayload entry in leader's log
453 expectFirstMatching(leaderCollectorActor, ApplyState.class);
454 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
455 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
456 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
457 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
458 votingServer(NEW_SERVER_ID));
460 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
464 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
465 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
468 RaftActorContext initialActorContext = new MockRaftActorContext();
470 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
471 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
472 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
473 actorFactory.generateActorId(LEADER_ID));
475 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
476 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
478 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
480 // Drop commit message so the snapshot doesn't complete.
481 leaderRaftActor.setDropMessageOfType(String.class);
483 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
485 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
487 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
488 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
490 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
492 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
496 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
497 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
500 RaftActorContext initialActorContext = new MockRaftActorContext();
502 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
503 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
504 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
505 actorFactory.generateActorId(LEADER_ID));
507 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
508 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
509 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
511 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
513 // Drop the commit message so the snapshot doesn't complete yet.
514 leaderRaftActor.setDropMessageOfType(String.class);
516 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
518 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
520 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
522 // Change the leader behavior to follower
523 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
525 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
526 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
527 // isCapturing assertion below.
528 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
530 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
531 leaderActor.tell(commitMsg, leaderActor);
533 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
535 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
536 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
538 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
539 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
541 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
545 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
546 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
549 RaftActorContext initialActorContext = new MockRaftActorContext();
551 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
552 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
553 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
554 actorFactory.generateActorId(LEADER_ID));
556 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
557 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
559 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
561 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
563 // Drop the UnInitializedFollowerSnapshotReply to delay it.
564 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
566 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
568 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
569 UnInitializedFollowerSnapshotReply.class);
571 // Prevent election timeout when the leader switches to follower
572 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
574 // Change the leader behavior to follower
575 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
577 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
578 leaderRaftActor.setDropMessageOfType(null);
579 leaderActor.tell(snapshotReply, leaderActor);
581 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
582 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
584 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
586 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
590 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
591 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
594 RaftActorContext initialActorContext = new MockRaftActorContext();
596 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
597 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
598 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
599 actorFactory.generateActorId(LEADER_ID));
601 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
602 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
603 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
605 // Drop the InstallSnapshot message so it times out
606 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
608 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
610 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
612 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
613 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
615 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
616 assertEquals("Leader followers size", 0,
617 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
619 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
623 public void testAddServerWithNoLeader() {
624 LOG.info("testAddServerWithNoLeader starting");
627 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
628 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
630 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
631 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
632 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
633 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
634 actorFactory.generateActorId(LEADER_ID));
635 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
637 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
638 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
639 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
641 LOG.info("testAddServerWithNoLeader ending");
645 public void testAddServerWithNoConsensusReached() {
646 LOG.info("testAddServerWithNoConsensusReached starting");
649 RaftActorContext initialActorContext = new MockRaftActorContext();
651 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
652 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
653 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
654 actorFactory.generateActorId(LEADER_ID));
656 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
657 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
659 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
661 // Drop UnInitializedFollowerSnapshotReply initially
662 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
664 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
665 TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
666 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
668 // Drop AppendEntries to the new follower so consensus isn't reached
669 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
671 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
673 // Capture the UnInitializedFollowerSnapshotReply
674 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
676 // Send the UnInitializedFollowerSnapshotReply to resume the first request
677 leaderRaftActor.setDropMessageOfType(null);
678 leaderActor.tell(snapshotReply, leaderActor);
680 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
682 // Send a second AddServer
683 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
685 // The first AddServer should succeed with OK even though consensus wasn't reached
686 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
687 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
688 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
690 // Verify ServerConfigurationPayload entry in leader's log
691 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
692 votingServer(NEW_SERVER_ID));
694 // The second AddServer should fail since consensus wasn't reached for the first
695 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
696 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
698 // Re-send the second AddServer - should also fail
699 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
700 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
701 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
703 LOG.info("testAddServerWithNoConsensusReached ending");
707 public void testAddServerWithExistingServer() {
708 LOG.info("testAddServerWithExistingServer starting");
710 RaftActorContext initialActorContext = new MockRaftActorContext();
712 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
713 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
714 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
715 actorFactory.generateActorId(LEADER_ID));
717 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
719 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
720 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
722 LOG.info("testAddServerWithExistingServer ending");
726 public void testAddServerForwardedToLeader() {
727 LOG.info("testAddServerForwardedToLeader starting");
730 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
731 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
733 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
734 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
735 actorFactory.generateActorId(LEADER_ID));
737 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
738 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
739 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
740 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
741 actorFactory.generateActorId(FOLLOWER_ID));
742 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
744 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
745 -1, -1, (short)0), leaderActor);
747 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
748 expectFirstMatching(leaderActor, AddServer.class);
750 LOG.info("testAddServerForwardedToLeader ending");
754 public void testOnApplyState() {
755 LOG.info("testOnApplyState starting");
757 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
758 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
759 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
760 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
761 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
762 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
763 actorFactory.generateActorId(LEADER_ID));
765 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
767 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
768 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
769 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
770 assertEquals("Message handled", true, handled);
772 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
773 new MockRaftActorContext.MockPayload("1"));
774 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
775 assertEquals("Message handled", false, handled);
777 LOG.info("testOnApplyState ending");
781 public void testRemoveServerWithNoLeader() {
782 LOG.info("testRemoveServerWithNoLeader starting");
784 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
785 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
787 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
788 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
789 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
790 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
791 actorFactory.generateActorId(LEADER_ID));
792 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
794 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
795 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
796 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
798 LOG.info("testRemoveServerWithNoLeader ending");
802 public void testRemoveServerNonExistentServer() {
803 LOG.info("testRemoveServerNonExistentServer starting");
805 RaftActorContext initialActorContext = new MockRaftActorContext();
807 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
808 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
809 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
810 actorFactory.generateActorId(LEADER_ID));
812 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
813 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
814 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
816 LOG.info("testRemoveServerNonExistentServer ending");
820 public void testRemoveServerForwardToLeader() {
821 LOG.info("testRemoveServerForwardToLeader starting");
823 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
824 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
826 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
827 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
828 actorFactory.generateActorId(LEADER_ID));
830 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
831 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
832 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
833 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
834 actorFactory.generateActorId(FOLLOWER_ID));
835 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
837 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
838 -1, -1, (short)0), leaderActor);
840 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
841 expectFirstMatching(leaderActor, RemoveServer.class);
843 LOG.info("testRemoveServerForwardToLeader ending");
847 public void testRemoveServer() {
848 LOG.info("testRemoveServer starting");
850 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
851 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
852 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
854 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
855 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
856 RaftActorContext initialActorContext = new MockRaftActorContext();
858 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
859 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
860 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
861 actorFactory.generateActorId(LEADER_ID));
863 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
865 TestActorRef<MessageCollectorActor> collector =
866 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
867 actorFactory.generateActorId("collector"));
868 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
869 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
870 configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
873 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
874 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
875 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
877 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
878 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
879 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
881 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
882 assertTrue("Expected Leader", currentBehavior instanceof Leader);
883 assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
885 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
887 LOG.info("testRemoveServer ending");
891 public void testRemoveServerLeader() {
892 LOG.info("testRemoveServerLeader starting");
894 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
895 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
896 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
898 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
899 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
900 RaftActorContext initialActorContext = new MockRaftActorContext();
902 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
903 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
904 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
905 actorFactory.generateActorId(LEADER_ID));
907 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
909 TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
910 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
911 actorFactory.createTestActor(
912 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
913 configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
916 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
917 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
918 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
920 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
921 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
922 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
923 votingServer(FOLLOWER_ID));
925 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
927 LOG.info("testRemoveServerLeader ending");
931 public void testRemoveServerLeaderWithNoFollowers() {
932 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
934 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
935 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
936 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
937 actorFactory.generateActorId(LEADER_ID));
939 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
940 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
941 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
943 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
947 public void testChangeServersVotingStatus() {
948 LOG.info("testChangeServersVotingStatus starting");
950 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
951 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
952 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
954 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
955 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
956 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
957 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
959 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
960 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
961 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
962 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
963 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
965 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
966 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
967 actorFactory.generateActorId("collector"));
968 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
969 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
970 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
971 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
973 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
974 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
975 actorFactory.generateActorId("collector"));
976 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
977 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
978 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
979 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
981 // Send first ChangeServersVotingStatus message
983 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
985 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
986 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
988 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
989 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
990 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
991 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
993 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
994 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
995 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
997 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
998 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
999 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1001 MessageCollectorActor.clearMessages(leaderCollector);
1002 MessageCollectorActor.clearMessages(follower1Collector);
1003 MessageCollectorActor.clearMessages(follower2Collector);
1005 // Send second ChangeServersVotingStatus message
1007 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1008 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1009 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1011 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1012 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1013 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1015 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1016 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1017 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1019 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1020 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1021 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1023 LOG.info("testChangeServersVotingStatus ending");
1027 public void testChangeLeaderToNonVoting() {
1028 LOG.info("testChangeLeaderToNonVoting starting");
1030 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1031 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1033 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1034 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1035 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1036 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1038 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1039 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1040 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1041 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1042 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1044 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1045 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1046 actorFactory.generateActorId("collector"));
1047 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1048 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1049 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1050 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1052 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1053 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1054 actorFactory.generateActorId("collector"));
1055 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1056 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1057 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1058 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1060 // Send ChangeServersVotingStatus message
1062 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1063 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1064 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1066 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1067 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1068 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1070 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1071 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1072 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1074 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1075 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1076 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1078 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1079 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1081 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1083 LOG.info("testChangeLeaderToNonVoting ending");
1087 public void testChangeToVotingWithNoLeader() {
1088 LOG.info("testChangeToVotingWithNoLeader starting");
1090 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1091 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1093 final String node1ID = "node1";
1094 final String node2ID = "node2";
1096 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1097 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1100 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1101 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1102 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1103 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1105 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1106 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1107 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1108 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1110 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1111 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1112 actorFactory.generateActorId("collector"));
1113 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1114 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1115 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1116 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1118 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1119 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1120 actorFactory.generateActorId("collector"));
1121 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1122 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1123 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1124 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1126 // Wait for snapshot after recovery
1127 MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1129 // Verify the intended server config was loaded and applied.
1130 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1131 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1132 votingServer("downNode2"));
1133 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1134 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1135 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1137 MessageCollectorActor.expectFirstMatching(node2Collector, SnapshotComplete.class);
1138 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1140 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1141 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1142 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1143 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1145 // First send the message such that node1 has no peer address for node2 - should fail.
1147 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1148 node2ID, true, "downNode1", false, "downNode2", false));
1149 node1RaftActorRef.tell(changeServers, testKit.getRef());
1150 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1151 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1153 // Update node2's peer address and send the message again
1155 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1157 node1RaftActorRef.tell(changeServers, testKit.getRef());
1158 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1159 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1161 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1162 assertEquals("getToIndex", 1, apply.getToIndex());
1163 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1164 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1165 nonVotingServer("downNode2"));
1166 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1167 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1169 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1170 assertEquals("getToIndex", 1, apply.getToIndex());
1171 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1172 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1173 nonVotingServer("downNode2"));
1174 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1175 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1177 LOG.info("testChangeToVotingWithNoLeader ending");
1181 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1182 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1184 final String node1ID = "node1";
1185 final String node2ID = "node2";
1187 PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
1189 public String resolve(String peerId) {
1190 return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1191 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1195 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1196 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1197 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1199 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1200 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1201 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1202 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1204 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1205 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1206 configParams1.setElectionTimeoutFactor(1);
1207 configParams1.setPeerAddressResolver(peerAddressResolver);
1208 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1209 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1210 actorFactory.generateActorId("collector"));
1211 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1212 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1213 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1214 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1216 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1217 configParams2.setElectionTimeoutFactor(1000000);
1218 configParams2.setPeerAddressResolver(peerAddressResolver);
1219 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1220 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1221 actorFactory.generateActorId("collector"));
1222 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1223 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1224 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1225 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1227 // Wait for snapshot after recovery
1228 MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1230 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1231 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1232 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1233 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1234 // node2 was previously voting.
1236 node2RaftActor.setDropMessageOfType(RequestVote.class);
1238 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1239 node1RaftActorRef.tell(changeServers, testKit.getRef());
1240 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1241 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1243 assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1244 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1245 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1247 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1251 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1252 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1254 final String node1ID = "node1";
1255 final String node2ID = "node2";
1257 PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
1259 public String resolve(String peerId) {
1260 return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1261 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1265 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1266 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1267 configParams.setElectionTimeoutFactor(3);
1268 configParams.setPeerAddressResolver(peerAddressResolver);
1270 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1271 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1272 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1274 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1275 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1276 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1277 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1278 InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1279 new MockRaftActorContext.MockPayload("2")));
1281 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1282 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1283 actorFactory.generateActorId("collector"));
1284 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1285 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1286 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1287 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1289 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1290 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1291 actorFactory.generateActorId("collector"));
1292 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1293 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1294 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1295 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1297 // Wait for snapshot after recovery
1298 MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1300 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1301 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1302 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1303 // forward the request to node2.
1305 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1306 ImmutableMap.of(node1ID, true, node2ID, true));
1307 node1RaftActorRef.tell(changeServers, testKit.getRef());
1308 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1309 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1311 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1312 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1313 votingServer(node1ID), votingServer(node2ID));
1314 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1316 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1317 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1318 votingServer(node1ID), votingServer(node2ID));
1319 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1320 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1322 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1326 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1327 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1329 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1330 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1331 configParams.setElectionTimeoutFactor(100000);
1333 final String node1ID = "node1";
1334 final String node2ID = "node2";
1336 configParams.setPeerAddressResolver(new PeerAddressResolver() {
1338 public String resolve(String peerId) {
1339 return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1340 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1344 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1345 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1346 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1348 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1349 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1350 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1351 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1353 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1354 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1355 actorFactory.generateActorId("collector"));
1356 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1357 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1358 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1359 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1361 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1362 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1363 actorFactory.generateActorId("collector"));
1364 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1365 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1366 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1367 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1369 // Wait for snapshot after recovery
1370 MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1372 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1373 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1374 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1375 // request to node2 when node2 is elected.
1377 node2RaftActor.setDropMessageOfType(RequestVote.class);
1379 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1381 node1RaftActorRef.tell(changeServers, testKit.getRef());
1383 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1385 node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
1387 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1388 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1390 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1391 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1392 votingServer(node1ID), votingServer(node2ID));
1393 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1394 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1396 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1397 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1398 votingServer(node1ID), votingServer(node2ID));
1399 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1401 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1404 private void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1405 Stopwatch sw = Stopwatch.createStarted();
1406 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1407 for(RaftActor raftActor: raftActors) {
1408 if(raftActor.getRaftState() == expState) {
1414 fail("None of the RaftActors have state " + expState);
1417 private static ServerInfo votingServer(String id) {
1418 return new ServerInfo(id, true);
1421 private static ServerInfo nonVotingServer(String id) {
1422 return new ServerInfo(id, false);
1425 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1426 return newCollectorActor(leaderRaftActor, LEADER_ID);
1429 private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1430 TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1431 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1432 actorFactory.generateActorId(id + "Collector"));
1433 raftActor.setCollectorActor(collectorActor);
1434 return collectorActor;
1437 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1438 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1439 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1440 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1441 assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1444 private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1445 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1446 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1447 configParams.setElectionTimeoutFactor(100000);
1448 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1449 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1450 termInfo.update(1, LEADER_ID);
1451 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1452 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1455 static abstract class AbstractMockRaftActor extends MockRaftActor {
1456 private volatile TestActorRef<MessageCollectorActor> collectorActor;
1457 private volatile Class<?> dropMessageOfType;
1459 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1460 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1461 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1462 persistent(Optional.of(persistent)));
1463 this.collectorActor = collectorActor;
1466 void setDropMessageOfType(Class<?> dropMessageOfType) {
1467 this.dropMessageOfType = dropMessageOfType;
1470 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1471 this.collectorActor = collectorActor;
1475 public void handleCommand(Object message) {
1476 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1477 super.handleCommand(message);
1480 if(collectorActor != null) {
1481 collectorActor.tell(message, getSender());
1486 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1488 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1489 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1490 super(id, peerAddresses, config, persistent, collectorActor);
1491 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1493 public void createSnapshot(ActorRef actorRef) {
1494 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1498 public void applySnapshot(byte[] snapshotBytes) {
1503 public static Props props(final String id, final Map<String, String> peerAddresses,
1504 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1506 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1507 persistent, collectorActor);
1512 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1513 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1514 RaftActorContext fromContext) {
1515 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1516 setPersistence(false);
1518 RaftActorContext context = getRaftActorContext();
1519 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1520 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1521 getState().add(entry.getData());
1522 context.getReplicatedLog().append(entry);
1525 context.setCommitIndex(fromContext.getCommitIndex());
1526 context.setLastApplied(fromContext.getLastApplied());
1527 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1528 fromContext.getTermInformation().getVotedFor());
1532 protected void initializeBehavior() {
1533 changeCurrentBehavior(new Leader(getRaftActorContext()));
1534 initializeBehaviorComplete.countDown();
1538 public void createSnapshot(ActorRef actorRef) {
1540 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1541 } catch (Exception e) {
1542 LOG.error("createSnapshot failed", e);
1546 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1547 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1548 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1549 configParams.setElectionTimeoutFactor(10);
1550 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1554 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1555 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1556 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1557 setPersistence(false);
1560 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1561 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);