2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import java.util.Collections;
24 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.After;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
32 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
35 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
38 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
39 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
40 import org.opendaylight.controller.cluster.raft.messages.AddServer;
41 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
43 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
45 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
46 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
47 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
48 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
49 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
50 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
51 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import scala.concurrent.duration.FiniteDuration;
57 * Unit tests for RaftActorServerConfigurationSupport.
59 * @author Thomas Pantelis
61 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
62 static final String LEADER_ID = "leader";
63 static final String FOLLOWER_ID = "follower";
64 static final String NEW_SERVER_ID = "new-server";
65 static final String NEW_SERVER_ID2 = "new-server2";
66 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
67 private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
69 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
71 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
72 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
73 actorFactory.generateActorId(FOLLOWER_ID));
75 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
76 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
77 private RaftActorContext newFollowerActorContext;
79 private final JavaTestKit testKit = new JavaTestKit(getSystem());
83 InMemoryJournal.clear();
84 InMemorySnapshotStore.clear();
86 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
88 newFollowerCollectorActor = actorFactory.createTestActor(
89 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
90 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
91 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
92 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
93 actorFactory.generateActorId(NEW_SERVER_ID));
96 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
97 } catch (Exception e) {
98 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
102 private static DefaultConfigParamsImpl newFollowerConfigParams() {
103 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
104 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
105 configParams.setElectionTimeoutFactor(100000);
106 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
111 public void tearDown() throws Exception {
112 actorFactory.close();
116 public void testAddServerWithExistingFollower() throws Exception {
117 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
118 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
120 followerActorContext.setCommitIndex(2);
121 followerActorContext.setLastApplied(2);
123 Follower follower = new Follower(followerActorContext);
124 followerActor.underlyingActor().setBehavior(follower);
126 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
127 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
128 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
129 actorFactory.generateActorId(LEADER_ID));
131 // Expect initial heartbeat from the leader.
132 expectFirstMatching(followerActor, AppendEntries.class);
133 clearMessages(followerActor);
135 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
137 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
139 // Leader should install snapshot - capture and verify ApplySnapshot contents
141 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
142 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
143 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
145 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
146 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
147 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
149 // Verify ServerConfigurationPayload entry in leader's log
151 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
152 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
153 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
154 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
155 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
156 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
158 // Verify ServerConfigurationPayload entry in both followers
160 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
161 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
162 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
164 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
165 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
166 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
168 // Verify new server config was applied in both followers
170 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
172 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
174 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
175 expectFirstMatching(followerActor, ApplyState.class);
177 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
178 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
179 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
180 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
182 List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
183 assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
184 ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
185 assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
186 assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
187 assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
189 persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
190 assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
191 logEntry = persistedLogEntries.get(0);
192 assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
193 assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
194 assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
195 logEntry.getData().getClass());
199 public void testAddServerWithNoExistingFollower() throws Exception {
200 RaftActorContext initialActorContext = new MockRaftActorContext();
201 initialActorContext.setCommitIndex(1);
202 initialActorContext.setLastApplied(1);
203 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
206 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
207 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
208 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
209 actorFactory.generateActorId(LEADER_ID));
211 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
212 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
214 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
216 // Leader should install snapshot - capture and verify ApplySnapshot contents
218 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
219 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
220 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
222 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
223 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
224 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
226 // Verify ServerConfigurationPayload entry in leader's log
228 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
229 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
230 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
231 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
232 votingServer(NEW_SERVER_ID));
234 // Verify ServerConfigurationPayload entry in the new follower
236 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
237 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
238 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
239 votingServer(NEW_SERVER_ID));
241 // Verify new server config was applied in the new follower
243 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
247 public void testAddServersAsNonVoting() throws Exception {
248 RaftActorContext initialActorContext = new MockRaftActorContext();
249 initialActorContext.setCommitIndex(-1);
250 initialActorContext.setLastApplied(-1);
251 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
253 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
254 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
255 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
256 actorFactory.generateActorId(LEADER_ID));
258 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
259 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
261 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
263 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
264 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
265 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
267 // Verify ServerConfigurationPayload entry in leader's log
269 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
270 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
271 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
272 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
273 nonVotingServer(NEW_SERVER_ID));
275 // Verify ServerConfigurationPayload entry in the new follower
277 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
278 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
279 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
280 nonVotingServer(NEW_SERVER_ID));
282 // Verify new server config was applied in the new follower
284 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
286 MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
288 // Add another non-voting server.
290 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
291 Follower newFollower2 = new Follower(follower2ActorContext);
292 followerActor.underlyingActor().setBehavior(newFollower2);
294 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
296 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
297 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
298 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
300 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
301 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
302 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
303 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
304 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
308 public void testAddServerWithOperationInProgress() throws Exception {
309 RaftActorContext initialActorContext = new MockRaftActorContext();
310 initialActorContext.setCommitIndex(-1);
311 initialActorContext.setLastApplied(-1);
312 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
314 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
315 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
316 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
317 actorFactory.generateActorId(LEADER_ID));
319 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
320 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
322 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
323 Follower newFollower2 = new Follower(follower2ActorContext);
324 followerActor.underlyingActor().setBehavior(newFollower2);
326 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
327 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
329 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
331 // Wait for leader's install snapshot and capture it
333 Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
335 // Send a second AddServer - should get queued
336 JavaTestKit testKit2 = new JavaTestKit(getSystem());
337 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
339 // Continue the first AddServer
340 newFollowerRaftActorInstance.setDropMessageOfType(null);
341 newFollowerRaftActor.tell(installSnapshot, leaderActor);
343 // Verify both complete successfully
344 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
345 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
347 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
348 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
350 // Verify ServerConfigurationPayload entries in leader's log
352 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
353 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
354 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
355 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
356 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
358 // Verify ServerConfigurationPayload entry in the new follower
360 MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
362 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
363 newFollowerActorContext.getPeerIds());
367 public void testAddServerWithPriorSnapshotInProgress() throws Exception {
368 RaftActorContext initialActorContext = new MockRaftActorContext();
369 initialActorContext.setCommitIndex(-1);
370 initialActorContext.setLastApplied(-1);
371 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
373 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
374 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
375 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
376 actorFactory.generateActorId(LEADER_ID));
378 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
379 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
381 TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
382 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
383 actorFactory.generateActorId(LEADER_ID + "Collector"));
384 leaderRaftActor.setCollectorActor(leaderCollectorActor);
386 // Drop commit message for now to delay snapshot completion
387 leaderRaftActor.setDropMessageOfType(String.class);
389 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
391 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
393 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
395 leaderRaftActor.setDropMessageOfType(null);
396 leaderActor.tell(commitMsg, leaderActor);
398 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
399 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
400 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
402 expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
404 // Verify ServerConfigurationPayload entry in leader's log
406 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
407 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
408 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
409 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
410 votingServer(NEW_SERVER_ID));
414 public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
415 RaftActorContext initialActorContext = new MockRaftActorContext();
416 initialActorContext.setCommitIndex(-1);
417 initialActorContext.setLastApplied(-1);
418 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
420 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
421 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
422 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
423 actorFactory.generateActorId(LEADER_ID));
425 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
426 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
428 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
430 TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
431 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
432 actorFactory.generateActorId(LEADER_ID + "Collector"));
433 leaderRaftActor.setCollectorActor(leaderCollectorActor);
435 // Drop commit message so the snapshot doesn't complete.
436 leaderRaftActor.setDropMessageOfType(String.class);
438 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
440 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
442 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
443 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
445 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
449 public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
450 RaftActorContext initialActorContext = new MockRaftActorContext();
451 initialActorContext.setCommitIndex(-1);
452 initialActorContext.setLastApplied(-1);
453 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
455 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
456 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
457 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
458 actorFactory.generateActorId(LEADER_ID));
460 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
461 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
462 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
464 TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
465 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
466 actorFactory.generateActorId(LEADER_ID + "Collector"));
467 leaderRaftActor.setCollectorActor(leaderCollectorActor);
469 // Drop the commit message so the snapshot doesn't complete yet.
470 leaderRaftActor.setDropMessageOfType(String.class);
472 leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
474 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
476 String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
478 // Change the leader behavior to follower
479 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
481 // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
482 // snapshot completes. This will prevent the invalid snapshot from completing and fail the
483 // isCapturing assertion below.
484 leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
486 // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
487 leaderActor.tell(commitMsg, leaderActor);
489 leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
491 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
492 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
494 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
495 assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
499 public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
500 RaftActorContext initialActorContext = new MockRaftActorContext();
501 initialActorContext.setCommitIndex(-1);
502 initialActorContext.setLastApplied(-1);
503 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
505 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
506 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
507 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
508 actorFactory.generateActorId(LEADER_ID));
510 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
511 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
513 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
515 TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
516 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
517 actorFactory.generateActorId(LEADER_ID + "Collector"));
518 leaderRaftActor.setCollectorActor(leaderCollectorActor);
520 // Drop the UnInitializedFollowerSnapshotReply to delay it.
521 leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
523 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
525 UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
526 UnInitializedFollowerSnapshotReply.class);
528 // Prevent election timeout when the leader switches to follower
529 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
531 // Change the leader behavior to follower
532 leaderActor.tell(new Follower(leaderActorContext), leaderActor);
534 // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
535 leaderRaftActor.setDropMessageOfType(null);
536 leaderActor.tell(snapshotReply, leaderActor);
538 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
539 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
541 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
545 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
546 RaftActorContext initialActorContext = new MockRaftActorContext();
547 initialActorContext.setCommitIndex(-1);
548 initialActorContext.setLastApplied(-1);
549 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
551 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
552 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
553 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
554 actorFactory.generateActorId(LEADER_ID));
556 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
557 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
558 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
560 // Drop the InstallSnapshot message so it times out
561 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
563 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
565 leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
567 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
568 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
570 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
571 assertEquals("Leader followers size", 0,
572 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
576 public void testAddServerWithNoLeader() {
577 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
578 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
580 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
581 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
582 Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
583 actorFactory.generateActorId(LEADER_ID));
584 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
586 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
587 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
588 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
592 public void testAddServerForwardedToLeader() {
593 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
594 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
596 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
597 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
598 actorFactory.generateActorId(LEADER_ID));
600 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
601 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
602 Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
603 actorFactory.generateActorId(FOLLOWER_ID));
604 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
606 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
607 -1, -1, (short)0), leaderActor);
609 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
610 expectFirstMatching(leaderActor, AddServer.class);
613 private ServerInfo votingServer(String id) {
614 return new ServerInfo(id, true);
617 private ServerInfo nonVotingServer(String id) {
618 return new ServerInfo(id, false);
621 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
622 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
623 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
624 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
625 assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
628 private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
629 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
630 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
631 configParams.setElectionTimeoutFactor(100000);
632 ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
633 termInfo.update(1, LEADER_ID);
634 RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
635 id, termInfo, -1, -1,
636 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
637 followerActorContext.setCommitIndex(-1);
638 followerActorContext.setLastApplied(-1);
639 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
641 return followerActorContext;
644 static abstract class AbstractMockRaftActor extends MockRaftActor {
645 private volatile TestActorRef<MessageCollectorActor> collectorActor;
646 private volatile Class<?> dropMessageOfType;
648 AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
649 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
650 super(id, peerAddresses, config, dataPersistenceProvider);
651 this.collectorActor = collectorActor;
654 void setDropMessageOfType(Class<?> dropMessageOfType) {
655 this.dropMessageOfType = dropMessageOfType;
658 void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
659 this.collectorActor = collectorActor;
663 public void handleCommand(Object message) {
664 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
665 super.handleCommand(message);
668 if(collectorActor != null) {
669 collectorActor.tell(message, getSender());
674 public static class MockLeaderRaftActor extends AbstractMockRaftActor {
675 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
676 RaftActorContext fromContext) {
677 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
678 setPersistence(false);
680 RaftActorContext context = getRaftActorContext();
681 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
682 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
683 getState().add(entry.getData());
684 context.getReplicatedLog().append(entry);
687 context.setCommitIndex(fromContext.getCommitIndex());
688 context.setLastApplied(fromContext.getLastApplied());
689 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
690 fromContext.getTermInformation().getVotedFor());
694 protected void initializeBehavior() {
695 changeCurrentBehavior(new Leader(getRaftActorContext()));
696 initializeBehaviorComplete.countDown();
700 public void createSnapshot(ActorRef actorRef) {
702 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
703 } catch (Exception e) {
704 LOG.error("createSnapshot failed", e);
708 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
709 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
710 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
711 configParams.setElectionTimeoutFactor(10);
712 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
716 public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
717 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
718 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
719 setPersistence(false);
722 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
723 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);