2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import java.util.Collections;
24 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.After;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
32 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
35 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
37 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
38 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
39 import org.opendaylight.controller.cluster.raft.messages.AddServer;
40 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
44 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
47 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.duration.FiniteDuration;
54 * Unit tests for RaftActorServerConfigurationSupport.
56 * @author Thomas Pantelis
58 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
59 static final String LEADER_ID = "leader";
60 static final String FOLLOWER_ID = "follower";
61 static final String NEW_SERVER_ID = "new-server";
62 static final String NEW_SERVER_ID2 = "new-server2";
63 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
64 private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
66 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
68 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
69 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
70 actorFactory.generateActorId(FOLLOWER_ID));
72 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
73 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
74 private RaftActorContext newFollowerActorContext;
76 private final JavaTestKit testKit = new JavaTestKit(getSystem());
80 InMemoryJournal.clear();
81 InMemorySnapshotStore.clear();
83 DefaultConfigParamsImpl configParams = newFollowerConfigParams();
85 newFollowerCollectorActor = actorFactory.createTestActor(
86 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
87 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
88 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
89 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
90 actorFactory.generateActorId(NEW_SERVER_ID));
93 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
94 } catch (Exception e) {
95 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
99 private static DefaultConfigParamsImpl newFollowerConfigParams() {
100 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
101 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
102 configParams.setElectionTimeoutFactor(100000);
103 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
108 public void tearDown() throws Exception {
109 actorFactory.close();
113 public void testAddServerWithExistingFollower() throws Exception {
114 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
115 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
117 followerActorContext.setCommitIndex(2);
118 followerActorContext.setLastApplied(2);
120 Follower follower = new Follower(followerActorContext);
121 followerActor.underlyingActor().setBehavior(follower);
123 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
124 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
125 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
126 actorFactory.generateActorId(LEADER_ID));
128 // Expect initial heartbeat from the leader.
129 expectFirstMatching(followerActor, AppendEntries.class);
130 clearMessages(followerActor);
132 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
134 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
136 // Leader should install snapshot - capture and verify ApplySnapshot contents
138 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
139 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
140 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
142 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
143 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
144 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
146 // Verify ServerConfigurationPayload entry in leader's log
148 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
149 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
150 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
151 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
152 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
153 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
155 // Verify ServerConfigurationPayload entry in both followers
157 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
158 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
159 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
161 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
162 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
163 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
165 // Verify new server config was applied in both followers
167 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
169 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
171 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
172 expectFirstMatching(followerActor, ApplyState.class);
174 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
175 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
176 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
177 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
181 public void testAddServerWithNoExistingFollower() throws Exception {
182 RaftActorContext initialActorContext = new MockRaftActorContext();
183 initialActorContext.setCommitIndex(1);
184 initialActorContext.setLastApplied(1);
185 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
188 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
189 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
190 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
191 actorFactory.generateActorId(LEADER_ID));
193 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
194 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
196 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
198 // Leader should install snapshot - capture and verify ApplySnapshot contents
200 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
201 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
202 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
204 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
205 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
206 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
208 // Verify ServerConfigurationPayload entry in leader's log
210 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
211 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
212 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
213 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
214 votingServer(NEW_SERVER_ID));
216 // Verify ServerConfigurationPayload entry in the new follower
218 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
219 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
220 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
221 votingServer(NEW_SERVER_ID));
223 // Verify new server config was applied in the new follower
225 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
229 public void testAddServersAsNonVoting() throws Exception {
230 RaftActorContext initialActorContext = new MockRaftActorContext();
231 initialActorContext.setCommitIndex(-1);
232 initialActorContext.setLastApplied(-1);
233 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
235 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
236 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
237 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
238 actorFactory.generateActorId(LEADER_ID));
240 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
241 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
243 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
245 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
246 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
247 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
249 // Verify ServerConfigurationPayload entry in leader's log
251 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
252 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
253 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
254 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
255 nonVotingServer(NEW_SERVER_ID));
257 // Verify ServerConfigurationPayload entry in the new follower
259 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
260 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
261 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
262 nonVotingServer(NEW_SERVER_ID));
264 // Verify new server config was applied in the new follower
266 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
268 MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
270 // Add another non-voting server.
272 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
273 Follower newFollower2 = new Follower(follower2ActorContext);
274 followerActor.underlyingActor().setBehavior(newFollower2);
276 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
278 addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
279 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
280 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
282 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
283 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
284 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
285 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
286 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
290 public void testAddServerWithOperationInProgress() throws Exception {
291 RaftActorContext initialActorContext = new MockRaftActorContext();
292 initialActorContext.setCommitIndex(-1);
293 initialActorContext.setLastApplied(-1);
294 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
296 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
297 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
298 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
299 actorFactory.generateActorId(LEADER_ID));
301 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
302 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
304 RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
305 Follower newFollower2 = new Follower(follower2ActorContext);
306 followerActor.underlyingActor().setBehavior(newFollower2);
308 MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
309 newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
311 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
313 // Wait for leader's install snapshot and capture it
315 Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
317 JavaTestKit testKit2 = new JavaTestKit(getSystem());
318 leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
320 newFollowerRaftActorInstance.setDropMessageOfType(null);
321 newFollowerRaftActor.tell(installSnapshot, leaderActor);
323 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
324 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
326 addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
327 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
329 // Verify ServerConfigurationPayload entries in leader's log
331 assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
332 assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
333 assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
334 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
335 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
337 // Verify ServerConfigurationPayload entry in the new follower
339 MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
341 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
342 newFollowerActorContext.getPeerIds());
346 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
347 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
349 RaftActorContext initialActorContext = new MockRaftActorContext();
350 initialActorContext.setCommitIndex(-1);
351 initialActorContext.setLastApplied(-1);
352 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
354 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
355 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
356 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
357 actorFactory.generateActorId(LEADER_ID));
359 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
360 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
361 ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
363 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
365 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
366 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
368 assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
369 assertEquals("Leader followers size", 0,
370 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
374 public void testAddServerWithNoLeader() {
375 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
376 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
378 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
379 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
380 Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
381 actorFactory.generateActorId(LEADER_ID));
382 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
384 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
385 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
386 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
390 public void testAddServerForwardedToLeader() {
391 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
392 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
394 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
395 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
396 actorFactory.generateActorId(LEADER_ID));
398 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
399 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
400 Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
401 actorFactory.generateActorId(FOLLOWER_ID));
402 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
404 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
405 -1, -1, (short)0), leaderActor);
407 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
408 expectFirstMatching(leaderActor, AddServer.class);
411 private ServerInfo votingServer(String id) {
412 return new ServerInfo(id, true);
415 private ServerInfo nonVotingServer(String id) {
416 return new ServerInfo(id, false);
419 private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
420 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
421 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
422 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
423 assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
426 private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
427 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
428 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
429 configParams.setElectionTimeoutFactor(100000);
430 ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
431 termInfo.update(1, LEADER_ID);
432 RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
433 id, termInfo, -1, -1,
434 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
435 followerActorContext.setCommitIndex(-1);
436 followerActorContext.setLastApplied(-1);
437 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
439 return followerActorContext;
442 public static class MockLeaderRaftActor extends MockRaftActor {
443 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
444 RaftActorContext fromContext) {
445 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
447 RaftActorContext context = getRaftActorContext();
448 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
449 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
450 getState().add(entry.getData());
451 context.getReplicatedLog().append(entry);
454 context.setCommitIndex(fromContext.getCommitIndex());
455 context.setLastApplied(fromContext.getLastApplied());
456 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
457 fromContext.getTermInformation().getVotedFor());
461 protected void initializeBehavior() {
462 changeCurrentBehavior(new Leader(getRaftActorContext()));
463 initializeBehaviorComplete.countDown();
467 public void createSnapshot(ActorRef actorRef) {
469 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
470 } catch (Exception e) {
471 LOG.error("createSnapshot failed", e);
475 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
476 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
477 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
478 configParams.setElectionTimeoutFactor(10);
479 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
483 public static class MockNewFollowerRaftActor extends MockRaftActor {
484 private final TestActorRef<MessageCollectorActor> collectorActor;
485 private volatile Class<?> dropMessageOfType;
487 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
488 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
489 this.collectorActor = collectorActor;
492 void setDropMessageOfType(Class<?> dropMessageOfType) {
493 this.dropMessageOfType = dropMessageOfType;
497 public void handleCommand(Object message) {
498 if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
499 super.handleCommand(message);
502 collectorActor.tell(message, getSender());
505 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
506 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);