2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Dispatchers;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import com.google.common.base.Optional;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import java.util.Collections;
26 import java.util.List;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.DataPersistenceProvider;
33 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
34 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
37 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
38 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
40 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
41 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
42 import org.opendaylight.controller.cluster.raft.messages.AddServer;
43 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
45 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
46 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
48 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
49 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
50 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
51 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
52 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
53 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.FiniteDuration;
59 * Unit tests for RaftActorServerConfigurationSupport.
61 * @author Thomas Pantelis
63 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
64 static final String LEADER_ID = "leader";
65 static final String FOLLOWER_ID = "follower";
66 static final String NEW_SERVER_ID = "new-server";
67 static final String NEW_SERVER_ID2 = "new-server2";
68 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
69 private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
71 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
73 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
74 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
75 actorFactory.generateActorId(FOLLOWER_ID));
77 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
78 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
79 private RaftActorContext newFollowerActorContext;
81 private final JavaTestKit testKit = new JavaTestKit(getSystem());
85 InMemoryJournal.clear();
86 InMemorySnapshotStore.clear();
88 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
90 newFollowerCollectorActor = actorFactory.createTestActor(
91 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
92 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
93 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
94 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
95 actorFactory.generateActorId(NEW_SERVER_ID));
98 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
99 } catch (Exception e) {
100 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
104 private static DefaultConfigParamsImpl newFollowerConfigParams() {
105 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
106 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
107 configParams.setElectionTimeoutFactor(100000);
108 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
113 public void tearDown() throws Exception {
114 actorFactory.close();
118 public void testAddServerWithExistingFollower() throws Exception {
119 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
120 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
122 followerActorContext.setCommitIndex(2);
123 followerActorContext.setLastApplied(2);
125 Follower follower = new Follower(followerActorContext);
126 followerActor.underlyingActor().setBehavior(follower);
128 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
129 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
130 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
131 actorFactory.generateActorId(LEADER_ID));
133 // Expect initial heartbeat from the leader.
134 expectFirstMatching(followerActor, AppendEntries.class);
135 clearMessages(followerActor);
137 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
138 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
140 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
142 // Leader should install snapshot - capture and verify ApplySnapshot contents
144 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
145 @SuppressWarnings("unchecked")
146 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
147 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
149 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
150 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
151 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
153 // Verify ServerConfigurationPayload entry in leader's log
155 expectFirstMatching(leaderCollectorActor, ApplyState.class);
156 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
157 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
158 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
159 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
160 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
161 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
163 // Verify ServerConfigurationPayload entry in both followers
165 expectFirstMatching(followerActor, ApplyState.class);
166 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
167 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
168 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
170 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
171 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
172 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
173 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
175 // Verify new server config was applied in both followers
177 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
179 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
181 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
182 expectFirstMatching(followerActor, ApplyState.class);
184 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
185 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
186 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
187 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
189 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
190 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
191 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
192 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
193 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
194 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
196 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
197 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
198 logEntry = persistedLogEntries.get(0);
199 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
200 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
201 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
202 logEntry.getData().getClass());
206 public void testAddServerWithNoExistingFollower() throws Exception {
207 RaftActorContext initialActorContext = new MockRaftActorContext();
208 initialActorContext.setCommitIndex(1);
209 initialActorContext.setLastApplied(1);
210 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
213 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
214 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
215 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
216 actorFactory.generateActorId(LEADER_ID));
218 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
219 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
221 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
223 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
225 // Leader should install snapshot - capture and verify ApplySnapshot contents
227 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
228 @SuppressWarnings("unchecked")
229 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
230 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
232 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
233 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
234 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
236 // Verify ServerConfigurationPayload entry in leader's log
238 expectFirstMatching(leaderCollectorActor, ApplyState.class);
239 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
240 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
241 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
242 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
243 votingServer(NEW_SERVER_ID));
245 // Verify ServerConfigurationPayload entry in the new follower
247 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
248 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
249 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
250 votingServer(NEW_SERVER_ID));
252 // Verify new server config was applied in the new follower
254 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
258 public void testAddServersAsNonVoting() throws Exception {
259 RaftActorContext initialActorContext = new MockRaftActorContext();
261 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
262 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
263 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
264 actorFactory.generateActorId(LEADER_ID));
266 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
267 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
269 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
271 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
273 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
274 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
275 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
277 // Verify ServerConfigurationPayload entry in leader's log
279 expectFirstMatching(leaderCollectorActor, ApplyState.class);
281 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
282 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
283 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
284 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
285 nonVotingServer(NEW_SERVER_ID));
287 // Verify ServerConfigurationPayload entry in the new follower
289 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
290 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
291 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
292 nonVotingServer(NEW_SERVER_ID));
294 // Verify new server config was applied in the new follower
296 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
298 assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
300 // Add another non-voting server.
302 clearMessages(leaderCollectorActor);
304 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
305 Follower newFollower2 = new Follower(follower2ActorContext);
306 followerActor.underlyingActor().setBehavior(newFollower2);
308 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
310 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
311 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
312 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
314 expectFirstMatching(leaderCollectorActor, ApplyState.class);
315 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
316 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
317 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
318 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
319 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
323 public void testAddServerWithOperationInProgress() throws Exception {
324 RaftActorContext initialActorContext = new MockRaftActorContext();
326 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
327 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
328 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
329 actorFactory.generateActorId(LEADER_ID));
331 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
332 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
334 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
336 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
337 Follower newFollower2 = new Follower(follower2ActorContext);
338 followerActor.underlyingActor().setBehavior(newFollower2);
340 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
341 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
343 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
345 // Wait for leader's install snapshot and capture it
347 Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
349 // Send a second AddServer - should get queued
350 JavaTestKit testKit2 = new JavaTestKit(getSystem());
351 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
353 // Continue the first AddServer
354 newFollowerRaftActorInstance.setDropMessageOfType(null);
355 newFollowerRaftActor.tell(installSnapshot, leaderActor);
357 // Verify both complete successfully
358 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
359 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
361 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
362 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
364 // Verify ServerConfigurationPayload entries in leader's log
366 expectMatching(leaderCollectorActor, ApplyState.class, 2);
367 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
368 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
369 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
370 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
371 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
373 // Verify ServerConfigurationPayload entry in the new follower
375 expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
376 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
377 newFollowerActorContext.getPeerIds());
381 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
382 RaftActorContext initialActorContext = new MockRaftActorContext();
384 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
385 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
386 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
387 actorFactory.generateActorId(LEADER_ID));
389 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
390 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
392 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
394 // Drop commit message for now to delay snapshot completion
395 leaderRaftActor.setDropMessageOfType(String.class);
397 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
399 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
401 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
403 leaderRaftActor.setDropMessageOfType(null);
404 leaderActor.tell(commitMsg, leaderActor);
406 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
407 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
408 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
410 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
412 // Verify ServerConfigurationPayload entry in leader's log
414 expectFirstMatching(leaderCollectorActor, ApplyState.class);
415 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
416 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
417 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
418 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
419 votingServer(NEW_SERVER_ID));
423 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
424 RaftActorContext initialActorContext = new MockRaftActorContext();
426 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
427 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
428 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
429 actorFactory.generateActorId(LEADER_ID));
431 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
432 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
434 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
436 // Drop commit message so the snapshot doesn't complete.
437 leaderRaftActor.setDropMessageOfType(String.class);
439 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
441 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
443 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
444 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
446 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
450 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
451 RaftActorContext initialActorContext = new MockRaftActorContext();
453 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
454 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
455 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
456 actorFactory.generateActorId(LEADER_ID));
458 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
459 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
460 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
462 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
464 // Drop the commit message so the snapshot doesn't complete yet.
465 leaderRaftActor.setDropMessageOfType(String.class);
467 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
469 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
471 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
473 // Change the leader behavior to follower
474 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
476 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
477 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
478 // isCapturing assertion below.
479 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
481 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
482 leaderActor.tell(commitMsg, leaderActor);
484 leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
486 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
487 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
489 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
490 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
494 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
495 RaftActorContext initialActorContext = new MockRaftActorContext();
497 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
498 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
499 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
500 actorFactory.generateActorId(LEADER_ID));
502 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
503 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
505 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
507 TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
509 // Drop the UnInitializedFollowerSnapshotReply to delay it.
510 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
512 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
514 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
515 UnInitializedFollowerSnapshotReply.class);
517 // Prevent election timeout when the leader switches to follower
518 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
520 // Change the leader behavior to follower
521 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
523 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
524 leaderRaftActor.setDropMessageOfType(null);
525 leaderActor.tell(snapshotReply, leaderActor);
527 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
528 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
530 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
534 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
535 RaftActorContext initialActorContext = new MockRaftActorContext();
537 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
538 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
539 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
540 actorFactory.generateActorId(LEADER_ID));
542 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
543 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
544 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
546 // Drop the InstallSnapshot message so it times out
547 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
549 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
551 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
553 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
554 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
556 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
557 assertEquals("Leader followers size", 0,
558 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
562 public void testAddServerWithNoLeader() {
563 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
564 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
566 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
567 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
568 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
569 actorFactory.generateActorId(LEADER_ID));
570 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
572 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
573 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
574 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
578 public void testAddServerWithNoConsensusReached() {
579 RaftActorContext initialActorContext = new MockRaftActorContext();
581 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
582 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
583 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
584 actorFactory.generateActorId(LEADER_ID));
586 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
587 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
589 newFollowerRaftActor.underlyingActor().setDropMessageOfType(AppendEntries.class);
591 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
593 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
594 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
595 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
597 // Verify ServerConfigurationPayload entry in leader's log
599 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
600 votingServer(NEW_SERVER_ID));
604 public void testAddServerWithExistingServer() {
605 RaftActorContext initialActorContext = new MockRaftActorContext();
607 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
608 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
609 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
610 actorFactory.generateActorId(LEADER_ID));
612 leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
614 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
615 assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
619 public void testAddServerForwardedToLeader() {
620 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
621 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
623 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
624 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
625 actorFactory.generateActorId(LEADER_ID));
627 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
628 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
629 configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
630 actorFactory.generateActorId(FOLLOWER_ID));
631 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
633 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
634 -1, -1, (short)0), leaderActor);
636 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
637 expectFirstMatching(leaderActor, AddServer.class);
641 public void testOnApplyState() {
642 RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(new MockRaftActorContext());
644 ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
645 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
646 boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), null, ActorRef.noSender());
647 assertEquals("Message handled", true, handled);
649 ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
650 new MockRaftActorContext.MockPayload("1"));
651 handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), null, ActorRef.noSender());
652 assertEquals("Message handled", false, handled);
655 private ServerInfo votingServer(String id) {
656 return new ServerInfo(id, true);
659 private ServerInfo nonVotingServer(String id) {
660 return new ServerInfo(id, false);
663 private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
664 TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
665 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
666 actorFactory.generateActorId(LEADER_ID + "Collector"));
667 leaderRaftActor.setCollectorActor(leaderCollectorActor);
668 return leaderCollectorActor;
671 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
672 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
673 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
674 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
675 assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
678 private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
679 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
680 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
681 configParams.setElectionTimeoutFactor(100000);
682 ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
683 termInfo.update(1, LEADER_ID);
684 return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
685 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
688 static abstract class AbstractMockRaftActor extends MockRaftActor {
689 private volatile TestActorRef<MessageCollectorActor> collectorActor;
690 private volatile Class<?> dropMessageOfType;
692 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
693 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
694 super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
695 dataPersistenceProvider(dataPersistenceProvider));
696 this.collectorActor = collectorActor;
699 void setDropMessageOfType(Class<?> dropMessageOfType) {
700 this.dropMessageOfType = dropMessageOfType;
703 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
704 this.collectorActor = collectorActor;
708 public void handleCommand(Object message) {
709 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
710 super.handleCommand(message);
713 if(collectorActor != null) {
714 collectorActor.tell(message, getSender());
719 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
720 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
721 RaftActorContext fromContext) {
722 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
723 setPersistence(false);
725 RaftActorContext context = getRaftActorContext();
726 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
727 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
728 getState().add(entry.getData());
729 context.getReplicatedLog().append(entry);
732 context.setCommitIndex(fromContext.getCommitIndex());
733 context.setLastApplied(fromContext.getLastApplied());
734 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
735 fromContext.getTermInformation().getVotedFor());
739 protected void initializeBehavior() {
740 changeCurrentBehavior(new Leader(getRaftActorContext()));
741 initializeBehaviorComplete.countDown();
745 public void createSnapshot(ActorRef actorRef) {
747 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
748 } catch (Exception e) {
749 LOG.error("createSnapshot failed", e);
753 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
754 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
755 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
756 configParams.setElectionTimeoutFactor(10);
757 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
761 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
762 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
763 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
764 setPersistence(false);
767 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
768 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);