2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
43 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
45 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
46 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
47 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
48 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
49 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
50 import org.opendaylight.controller.cluster.raft.messages.AddServer;
51 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
52 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
53 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
54 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
56 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
57 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
58 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
59 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
60 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
61 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.duration.FiniteDuration;
72 * Unit tests for RaftActorServerConfigurationSupport.
74 * @author Thomas Pantelis
76 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
77 static final String LEADER_ID = "leader";
78 static final String FOLLOWER_ID = "follower";
79 static final String FOLLOWER_ID2 = "follower2";
80 static final String NEW_SERVER_ID = "new-server";
81 static final String NEW_SERVER_ID2 = "new-server2";
82 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
83 private static final boolean NO_PERSISTENCE = false;
84 private static final boolean PERSISTENT = true;
86 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
88 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
89 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
90 actorFactory.generateActorId(FOLLOWER_ID));
92 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
93 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
94 private RaftActorContext newFollowerActorContext;
96 private final JavaTestKit testKit = new JavaTestKit(getSystem());
100 InMemoryJournal.clear();
101 InMemorySnapshotStore.clear();
104 private void setupNewFollower() {
105 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
107 newFollowerCollectorActor = actorFactory.createTestActor(
108 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
109 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
110 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
111 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
112 actorFactory.generateActorId(NEW_SERVER_ID));
115 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
116 } catch (Exception e) {
117 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
121 private static DefaultConfigParamsImpl newFollowerConfigParams() {
122 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
123 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
124 configParams.setElectionTimeoutFactor(100000);
125 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
130 public void tearDown() throws Exception {
131 actorFactory.close();
135 public void testAddServerWithExistingFollower() throws Exception {
136 LOG.info("testAddServerWithExistingFollower starting");
138 RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
139 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
141 followerActorContext.setCommitIndex(2);
142 followerActorContext.setLastApplied(2);
144 Follower follower = new Follower(followerActorContext);
145 followerActor.underlyingActor().setBehavior(follower);
146 followerActorContext.setCurrentBehavior(follower);
148 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
149 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
150 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
151 actorFactory.generateActorId(LEADER_ID));
153 // Expect initial heartbeat from the leader.
154 expectFirstMatching(followerActor, AppendEntries.class);
155 clearMessages(followerActor);
157 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
158 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
160 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
162 // Leader should install snapshot - capture and verify ApplySnapshot contents
164 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
165 @SuppressWarnings("unchecked")
166 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
167 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
169 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
170 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
171 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
173 // Verify ServerConfigurationPayload entry in leader's log
175 expectFirstMatching(leaderCollectorActor, ApplyState.class);
176 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
177 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
178 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
179 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
180 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
181 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
183 // Verify ServerConfigurationPayload entry in both followers
185 expectFirstMatching(followerActor, ApplyState.class);
186 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
187 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
188 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
190 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
191 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
192 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
193 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
195 // Verify new server config was applied in both followers
197 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
199 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
201 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
202 expectFirstMatching(followerActor, ApplyState.class);
204 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
205 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
206 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
207 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
209 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
210 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
211 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
212 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
213 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
214 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
216 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
217 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
218 logEntry = persistedLogEntries.get(0);
219 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
220 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
221 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
222 logEntry.getData().getClass());
224 LOG.info("testAddServerWithExistingFollower ending");
228 public void testAddServerWithNoExistingFollower() throws Exception {
229 LOG.info("testAddServerWithNoExistingFollower starting");
232 RaftActorContext initialActorContext = new MockRaftActorContext();
233 initialActorContext.setCommitIndex(1);
234 initialActorContext.setLastApplied(1);
235 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
238 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
239 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
240 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
241 actorFactory.generateActorId(LEADER_ID));
243 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
244 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
246 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
248 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
250 // Leader should install snapshot - capture and verify ApplySnapshot contents
252 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
253 @SuppressWarnings("unchecked")
254 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
255 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
257 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
258 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
259 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
261 // Verify ServerConfigurationPayload entry in leader's log
263 expectFirstMatching(leaderCollectorActor, ApplyState.class);
264 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
265 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
266 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
267 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
268 votingServer(NEW_SERVER_ID));
270 // Verify ServerConfigurationPayload entry in the new follower
272 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
273 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
274 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
275 votingServer(NEW_SERVER_ID));
277 // Verify new server config was applied in the new follower
279 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
281 LOG.info("testAddServerWithNoExistingFollower ending");
285 public void testAddServersAsNonVoting() throws Exception {
286 LOG.info("testAddServersAsNonVoting starting");
289 RaftActorContext initialActorContext = new MockRaftActorContext();
291 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
292 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
293 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
294 actorFactory.generateActorId(LEADER_ID));
296 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
297 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
299 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
301 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
303 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
304 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
305 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
307 // Verify ServerConfigurationPayload entry in leader's log
309 expectFirstMatching(leaderCollectorActor, ApplyState.class);
311 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
312 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
313 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
314 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
315 nonVotingServer(NEW_SERVER_ID));
317 // Verify ServerConfigurationPayload entry in the new follower
319 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
320 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
321 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
322 nonVotingServer(NEW_SERVER_ID));
324 // Verify new server config was applied in the new follower
326 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
328 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
330 // Add another non-voting server.
332 clearMessages(leaderCollectorActor);
334 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
335 Follower newFollower2 = new Follower(follower2ActorContext);
336 followerActor.underlyingActor().setBehavior(newFollower2);
338 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
340 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
341 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
342 assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
344 expectFirstMatching(leaderCollectorActor, ApplyState.class);
345 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
346 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
347 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
348 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
349 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
351 LOG.info("testAddServersAsNonVoting ending");
355 public void testAddServerWithOperationInProgress() throws Exception {
356 LOG.info("testAddServerWithOperationInProgress starting");
359 RaftActorContext initialActorContext = new MockRaftActorContext();
361 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
362 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
363 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
364 actorFactory.generateActorId(LEADER_ID));
366 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
367 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
369 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
371 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
372 Follower newFollower2 = new Follower(follower2ActorContext);
373 followerActor.underlyingActor().setBehavior(newFollower2);
375 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
376 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
378 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
380 // Wait for leader's install snapshot and capture it
382 InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
384 // Send a second AddServer - should get queued
385 JavaTestKit testKit2 = new JavaTestKit(getSystem());
386 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
388 // Continue the first AddServer
389 newFollowerRaftActorInstance.setDropMessageOfType(null);
390 newFollowerRaftActor.tell(installSnapshot, leaderActor);
392 // Verify both complete successfully
393 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
394 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
396 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
397 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
399 // Verify ServerConfigurationPayload entries in leader's log
401 expectMatching(leaderCollectorActor, ApplyState.class, 2);
402 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
403 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
404 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
405 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
406 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
408 // Verify ServerConfigurationPayload entry in the new follower
410 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
411 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
412 newFollowerActorContext.getPeerIds());
414 LOG.info("testAddServerWithOperationInProgress ending");
418 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
419 LOG.info("testAddServerWithPriorSnapshotInProgress starting");
422 RaftActorContext initialActorContext = new MockRaftActorContext();
424 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
425 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
426 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
427 actorFactory.generateActorId(LEADER_ID));
429 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
430 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
432 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
434 // Drop commit message for now to delay snapshot completion
435 leaderRaftActor.setDropMessageOfType(String.class);
437 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
439 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
441 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
443 leaderRaftActor.setDropMessageOfType(null);
444 leaderActor.tell(commitMsg, leaderActor);
446 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
447 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
448 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
450 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
452 // Verify ServerConfigurationPayload entry in leader's log
454 expectFirstMatching(leaderCollectorActor, ApplyState.class);
455 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
456 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
457 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
458 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
459 votingServer(NEW_SERVER_ID));
461 LOG.info("testAddServerWithPriorSnapshotInProgress ending");
465 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
466 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
469 RaftActorContext initialActorContext = new MockRaftActorContext();
471 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
472 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
473 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
474 actorFactory.generateActorId(LEADER_ID));
476 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
477 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
479 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
481 // Drop commit message so the snapshot doesn't complete.
482 leaderRaftActor.setDropMessageOfType(String.class);
484 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
486 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
488 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
489 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
491 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
493 LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
497 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
498 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
501 RaftActorContext initialActorContext = new MockRaftActorContext();
503 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
504 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
505 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
506 actorFactory.generateActorId(LEADER_ID));
508 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
509 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
510 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
512 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
514 // Drop the commit message so the snapshot doesn't complete yet.
515 leaderRaftActor.setDropMessageOfType(String.class);
517 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
519 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
521 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
523 // Change the leader behavior to follower
524 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
526 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
527 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
528 // isCapturing assertion below.
529 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
531 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
532 leaderActor.tell(commitMsg, leaderActor);
534 leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
536 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
537 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
539 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
540 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
542 LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
546 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
547 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
550 RaftActorContext initialActorContext = new MockRaftActorContext();
552 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
553 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
554 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
555 actorFactory.generateActorId(LEADER_ID));
557 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
558 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
560 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
562 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
564 // Drop the UnInitializedFollowerSnapshotReply to delay it.
565 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
567 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
569 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
570 UnInitializedFollowerSnapshotReply.class);
572 // Prevent election timeout when the leader switches to follower
573 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
575 // Change the leader behavior to follower
576 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
578 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
579 leaderRaftActor.setDropMessageOfType(null);
580 leaderActor.tell(snapshotReply, leaderActor);
582 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
583 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
585 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
587 LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
591 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
592 LOG.info("testAddServerWithInstallSnapshotTimeout starting");
595 RaftActorContext initialActorContext = new MockRaftActorContext();
597 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
598 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
599 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
600 actorFactory.generateActorId(LEADER_ID));
602 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
603 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
604 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
606 // Drop the InstallSnapshot message so it times out
607 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
609 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
611 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
613 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
614 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
616 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
617 assertEquals("Leader followers size", 0,
618 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
620 LOG.info("testAddServerWithInstallSnapshotTimeout ending");
624 public void testAddServerWithNoLeader() {
625 LOG.info("testAddServerWithNoLeader starting");
628 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
629 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
631 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
632 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
633 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
634 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
635 actorFactory.generateActorId(LEADER_ID));
636 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
638 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
639 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
640 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
642 LOG.info("testAddServerWithNoLeader ending");
646 public void testAddServerWithNoConsensusReached() {
647 LOG.info("testAddServerWithNoConsensusReached starting");
650 RaftActorContext initialActorContext = new MockRaftActorContext();
652 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
653 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
654 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
655 actorFactory.generateActorId(LEADER_ID));
657 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
658 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
660 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
662 // Drop UnInitializedFollowerSnapshotReply initially
663 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
665 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
666 TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
667 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
669 // Drop AppendEntries to the new follower so consensus isn't reached
670 newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
672 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
674 // Capture the UnInitializedFollowerSnapshotReply
675 Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
677 // Send the UnInitializedFollowerSnapshotReply to resume the first request
678 leaderRaftActor.setDropMessageOfType(null);
679 leaderActor.tell(snapshotReply, leaderActor);
681 expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
683 // Send a second AddServer
684 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
686 // The first AddServer should succeed with OK even though consensus wasn't reached
687 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
688 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
689 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
691 // Verify ServerConfigurationPayload entry in leader's log
692 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
693 votingServer(NEW_SERVER_ID));
695 // The second AddServer should fail since consensus wasn't reached for the first
696 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
697 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
699 // Re-send the second AddServer - should also fail
700 leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
701 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
702 assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
704 LOG.info("testAddServerWithNoConsensusReached ending");
708 public void testAddServerWithExistingServer() {
709 LOG.info("testAddServerWithExistingServer starting");
711 RaftActorContext initialActorContext = new MockRaftActorContext();
713 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
714 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
715 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
716 actorFactory.generateActorId(LEADER_ID));
718 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
720 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
721 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
723 LOG.info("testAddServerWithExistingServer ending");
727 public void testAddServerForwardedToLeader() {
728 LOG.info("testAddServerForwardedToLeader starting");
731 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
732 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
734 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
735 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
736 actorFactory.generateActorId(LEADER_ID));
738 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
739 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
740 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
741 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
742 actorFactory.generateActorId(FOLLOWER_ID));
743 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
745 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
746 -1, -1, (short)0), leaderActor);
748 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
749 expectFirstMatching(leaderActor, AddServer.class);
751 LOG.info("testAddServerForwardedToLeader ending");
755 public void testOnApplyState() {
756 LOG.info("testOnApplyState starting");
758 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
759 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
760 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
761 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
762 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
763 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
764 actorFactory.generateActorId(LEADER_ID));
766 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
768 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
769 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
770 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
771 assertEquals("Message handled", true, handled);
773 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
774 new MockRaftActorContext.MockPayload("1"));
775 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
776 assertEquals("Message handled", false, handled);
778 LOG.info("testOnApplyState ending");
782 public void testRemoveServerWithNoLeader() {
783 LOG.info("testRemoveServerWithNoLeader starting");
785 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
786 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
788 TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
789 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
790 followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
791 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
792 actorFactory.generateActorId(LEADER_ID));
793 leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
795 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
796 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
797 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
799 LOG.info("testRemoveServerWithNoLeader ending");
803 public void testRemoveServerNonExistentServer() {
804 LOG.info("testRemoveServerNonExistentServer starting");
806 RaftActorContext initialActorContext = new MockRaftActorContext();
808 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
809 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
810 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
811 actorFactory.generateActorId(LEADER_ID));
813 leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
814 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
815 assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
817 LOG.info("testRemoveServerNonExistentServer ending");
821 public void testRemoveServerForwardToLeader() {
822 LOG.info("testRemoveServerForwardToLeader starting");
824 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
825 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
827 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
828 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
829 actorFactory.generateActorId(LEADER_ID));
831 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
832 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
833 leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
834 props().withDispatcher(Dispatchers.DefaultDispatcherId()),
835 actorFactory.generateActorId(FOLLOWER_ID));
836 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
838 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
839 -1, -1, (short)0), leaderActor);
841 followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
842 expectFirstMatching(leaderActor, RemoveServer.class);
844 LOG.info("testRemoveServerForwardToLeader ending");
848 public void testRemoveServer() {
849 LOG.info("testRemoveServer starting");
851 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
852 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
853 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
855 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
856 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
857 RaftActorContext initialActorContext = new MockRaftActorContext();
859 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
860 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
861 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
862 actorFactory.generateActorId(LEADER_ID));
864 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
866 TestActorRef<MessageCollectorActor> collector =
867 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
868 actorFactory.generateActorId("collector"));
869 TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
870 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
871 configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
874 leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
875 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
876 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
878 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
879 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
880 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
882 RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
883 assertTrue("Expected Leader", currentBehavior instanceof Leader);
884 assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
886 MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
888 LOG.info("testRemoveServer ending");
892 public void testRemoveServerLeader() {
893 LOG.info("testRemoveServerLeader starting");
895 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
896 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
897 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
899 final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
900 final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
901 RaftActorContext initialActorContext = new MockRaftActorContext();
903 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
904 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
905 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
906 actorFactory.generateActorId(LEADER_ID));
908 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
910 TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
911 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
912 actorFactory.createTestActor(
913 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
914 configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
917 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
918 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
919 assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
921 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
922 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
923 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
924 votingServer(FOLLOWER_ID));
926 MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
928 LOG.info("testRemoveServerLeader ending");
932 public void testRemoveServerLeaderWithNoFollowers() {
933 LOG.info("testRemoveServerLeaderWithNoFollowers starting");
935 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
936 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
937 new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
938 actorFactory.generateActorId(LEADER_ID));
940 leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
941 RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
942 assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
944 LOG.info("testRemoveServerLeaderWithNoFollowers ending");
948 public void testChangeServersVotingStatus() {
949 LOG.info("testChangeServersVotingStatus starting");
951 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
952 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
953 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
955 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
956 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
957 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
958 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
960 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
961 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
962 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
963 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
964 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
966 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
967 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
968 actorFactory.generateActorId("collector"));
969 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
970 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
971 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
972 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
974 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
975 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
976 actorFactory.generateActorId("collector"));
977 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
978 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
979 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
980 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
982 // Send first ChangeServersVotingStatus message
984 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
986 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
987 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
989 final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
990 assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
991 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
992 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
994 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
995 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
996 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
998 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
999 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1000 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1002 MessageCollectorActor.clearMessages(leaderCollector);
1003 MessageCollectorActor.clearMessages(follower1Collector);
1004 MessageCollectorActor.clearMessages(follower2Collector);
1006 // Send second ChangeServersVotingStatus message
1008 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1009 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1010 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1012 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1013 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1014 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1016 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1017 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1018 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1020 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1021 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1022 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1024 LOG.info("testChangeServersVotingStatus ending");
1028 public void testChangeLeaderToNonVoting() {
1029 LOG.info("testChangeLeaderToNonVoting starting");
1031 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1032 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1034 final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1035 final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1036 final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1037 final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1039 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1040 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1041 FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1042 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1043 TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1045 TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1046 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1047 actorFactory.generateActorId("collector"));
1048 TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1049 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1050 FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1051 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1053 TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1054 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1055 actorFactory.generateActorId("collector"));
1056 TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1057 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1058 FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1059 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1061 // Send ChangeServersVotingStatus message
1063 leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1064 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1065 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1067 MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1068 verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1069 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1071 MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1072 verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1073 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1075 MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1076 verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1077 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1079 verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1080 verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1082 MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1084 LOG.info("testChangeLeaderToNonVoting ending");
1088 public void testChangeToVotingWithNoLeader() {
1089 LOG.info("testChangeToVotingWithNoLeader starting");
1091 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1092 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1094 final String node1ID = "node1";
1095 final String node2ID = "node2";
1097 // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1098 // via the server config. The server config will also contain 2 voting peers that are down (ie no
1101 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1102 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1103 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1104 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1106 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1107 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1108 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1109 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1111 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1112 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1113 actorFactory.generateActorId("collector"));
1114 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1115 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1116 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1117 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1119 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1120 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1121 actorFactory.generateActorId("collector"));
1122 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1123 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1124 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1125 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1127 // Wait for snapshot after recovery
1128 MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1130 // Verify the intended server config was loaded and applied.
1131 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1132 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1133 votingServer("downNode2"));
1134 assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1135 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1136 assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1138 MessageCollectorActor.expectFirstMatching(node2Collector, SnapshotComplete.class);
1139 assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1141 // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1142 // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1143 // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1144 // down nodes are switched to non-voting, node1 should only need a vote from node2.
1146 // First send the message such that node1 has no peer address for node2 - should fail.
1148 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1149 node2ID, true, "downNode1", false, "downNode2", false));
1150 node1RaftActorRef.tell(changeServers, testKit.getRef());
1151 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1152 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1154 // Update node2's peer address and send the message again
1156 node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1158 node1RaftActorRef.tell(changeServers, testKit.getRef());
1159 reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1160 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1162 ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1163 assertEquals("getToIndex", 1, apply.getToIndex());
1164 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1165 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1166 nonVotingServer("downNode2"));
1167 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1168 assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1170 apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1171 assertEquals("getToIndex", 1, apply.getToIndex());
1172 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1173 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1174 nonVotingServer("downNode2"));
1175 assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1176 assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1178 LOG.info("testChangeToVotingWithNoLeader ending");
1182 public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1183 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1185 final String node1ID = "node1";
1186 final String node2ID = "node2";
1188 PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
1190 public String resolve(String peerId) {
1191 return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1192 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1196 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1197 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1198 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1200 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1201 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1202 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1203 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1205 DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1206 configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1207 configParams1.setElectionTimeoutFactor(1);
1208 configParams1.setPeerAddressResolver(peerAddressResolver);
1209 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1210 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1211 actorFactory.generateActorId("collector"));
1212 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1213 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1214 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1215 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1217 DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1218 configParams2.setElectionTimeoutFactor(1000000);
1219 configParams2.setPeerAddressResolver(peerAddressResolver);
1220 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1221 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1222 actorFactory.generateActorId("collector"));
1223 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1224 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1225 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1226 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1228 // Wait for snapshot after recovery
1229 MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1231 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1232 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1233 // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1234 // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1235 // node2 was previously voting.
1237 node2RaftActor.setDropMessageOfType(RequestVote.class);
1239 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1240 node1RaftActorRef.tell(changeServers, testKit.getRef());
1241 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1242 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1244 assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1245 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1246 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1248 LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1252 public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1253 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1255 final String node1ID = "node1";
1256 final String node2ID = "node2";
1258 PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
1260 public String resolve(String peerId) {
1261 return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1262 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1266 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1267 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1268 configParams.setElectionTimeoutFactor(3);
1269 configParams.setPeerAddressResolver(peerAddressResolver);
1271 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1272 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1273 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1275 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1276 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1277 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1278 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1279 InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1280 new MockRaftActorContext.MockPayload("2")));
1282 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1283 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1284 actorFactory.generateActorId("collector"));
1285 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1286 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1287 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1288 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1290 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1291 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1292 actorFactory.generateActorId("collector"));
1293 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1294 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1295 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1296 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1298 // Wait for snapshot after recovery
1299 MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1301 // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1302 // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1303 // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1304 // forward the request to node2.
1306 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1307 ImmutableMap.of(node1ID, true, node2ID, true));
1308 node1RaftActorRef.tell(changeServers, testKit.getRef());
1309 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1310 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1312 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1313 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1314 votingServer(node1ID), votingServer(node2ID));
1315 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1317 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1318 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1319 votingServer(node1ID), votingServer(node2ID));
1320 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1321 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1323 LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1327 public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1328 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1330 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1331 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1332 configParams.setElectionTimeoutFactor(100000);
1334 final String node1ID = "node1";
1335 final String node2ID = "node2";
1337 configParams.setPeerAddressResolver(new PeerAddressResolver() {
1339 public String resolve(String peerId) {
1340 return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1341 peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1345 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1346 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1347 ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1349 InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1350 InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1351 InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1352 InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1354 TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1355 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1356 actorFactory.generateActorId("collector"));
1357 TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1358 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1359 PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1360 CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1362 TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1363 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1364 actorFactory.generateActorId("collector"));
1365 TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1366 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1367 PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1368 CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1370 // Wait for snapshot after recovery
1371 MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
1373 // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1374 // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1375 // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1376 // request to node2 when node2 is elected.
1378 node2RaftActor.setDropMessageOfType(RequestVote.class);
1380 ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1382 node1RaftActorRef.tell(changeServers, testKit.getRef());
1384 MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1386 node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
1388 ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1389 assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1391 MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1392 verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1393 votingServer(node1ID), votingServer(node2ID));
1394 assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1395 assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1397 MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1398 verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1399 votingServer(node1ID), votingServer(node2ID));
1400 assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1402 LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1405 private void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1406 Stopwatch sw = Stopwatch.createStarted();
1407 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1408 for(RaftActor raftActor: raftActors) {
1409 if(raftActor.getRaftState() == expState) {
1415 fail("None of the RaftActors have state " + expState);
1418 private static ServerInfo votingServer(String id) {
1419 return new ServerInfo(id, true);
1422 private static ServerInfo nonVotingServer(String id) {
1423 return new ServerInfo(id, false);
1426 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1427 return newCollectorActor(leaderRaftActor, LEADER_ID);
1430 private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1431 TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1432 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1433 actorFactory.generateActorId(id + "Collector"));
1434 raftActor.setCollectorActor(collectorActor);
1435 return collectorActor;
1438 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1439 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1440 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1441 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1442 assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1445 private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1446 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1447 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1448 configParams.setElectionTimeoutFactor(100000);
1449 NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1450 ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1451 termInfo.update(1, LEADER_ID);
1452 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1453 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1456 static abstract class AbstractMockRaftActor extends MockRaftActor {
1457 private volatile TestActorRef<MessageCollectorActor> collectorActor;
1458 private volatile Class<?> dropMessageOfType;
1460 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1461 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1462 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1463 persistent(Optional.of(persistent)));
1464 this.collectorActor = collectorActor;
1467 void setDropMessageOfType(Class<?> dropMessageOfType) {
1468 this.dropMessageOfType = dropMessageOfType;
1471 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1472 this.collectorActor = collectorActor;
1476 public void handleCommand(Object message) {
1477 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1478 super.handleCommand(message);
1481 if(collectorActor != null) {
1482 collectorActor.tell(message, getSender());
1487 public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1489 CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1490 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1491 super(id, peerAddresses, config, persistent, collectorActor);
1492 snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1494 public void createSnapshot(ActorRef actorRef) {
1495 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1499 public void applySnapshot(byte[] snapshotBytes) {
1504 public static Props props(final String id, final Map<String, String> peerAddresses,
1505 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1507 return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1508 persistent, collectorActor);
1513 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1514 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1515 RaftActorContext fromContext) {
1516 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1517 setPersistence(false);
1519 RaftActorContext context = getRaftActorContext();
1520 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1521 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1522 getState().add(entry.getData());
1523 context.getReplicatedLog().append(entry);
1526 context.setCommitIndex(fromContext.getCommitIndex());
1527 context.setLastApplied(fromContext.getLastApplied());
1528 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1529 fromContext.getTermInformation().getVotedFor());
1533 protected void initializeBehavior() {
1534 changeCurrentBehavior(new Leader(getRaftActorContext()));
1535 initializeBehaviorComplete.countDown();
1539 public void createSnapshot(ActorRef actorRef) {
1541 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1542 } catch (Exception e) {
1543 LOG.error("createSnapshot failed", e);
1547 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1548 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1549 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1550 configParams.setElectionTimeoutFactor(10);
1551 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1555 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1556 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1557 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1558 setPersistence(false);
1561 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1562 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);