2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import java.util.Collections;
24 import java.util.List;
26 //import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
35 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
37 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
38 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
39 import org.opendaylight.controller.cluster.raft.messages.AddServer;
40 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
44 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
47 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.duration.FiniteDuration;
54 * Unit tests for RaftActorServerConfigurationSupport.
56 * @author Thomas Pantelis
58 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
59 static final String LEADER_ID = "leader";
60 static final String FOLLOWER_ID = "follower";
61 static final String NEW_SERVER_ID = "new-server";
62 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
63 private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
65 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
67 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
68 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
69 actorFactory.generateActorId(FOLLOWER_ID));
71 private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
72 private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
74 private RaftActorContext newFollowerActorContext;
75 private final JavaTestKit testKit = new JavaTestKit(getSystem());
79 InMemoryJournal.clear();
80 InMemorySnapshotStore.clear();
82 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
83 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
84 configParams.setElectionTimeoutFactor(100000);
85 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
87 newFollowerCollectorActor = actorFactory.createTestActor(
88 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
89 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
90 newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
91 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
92 actorFactory.generateActorId(NEW_SERVER_ID));
93 newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
97 public void tearDown() throws Exception {
102 public void testAddServerWithExistingFollower() throws Exception {
103 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
104 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
106 followerActorContext.setCommitIndex(2);
107 followerActorContext.setLastApplied(2);
109 Follower follower = new Follower(followerActorContext);
110 followerActor.underlyingActor().setBehavior(follower);
112 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
113 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
114 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
115 actorFactory.generateActorId(LEADER_ID));
117 // Expect initial heartbeat from the leader.
118 expectFirstMatching(followerActor, AppendEntries.class);
119 clearMessages(followerActor);
121 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
123 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
125 // Leader should install snapshot - capture and verify ApplySnapshot contents
127 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
128 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
129 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
131 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
132 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
133 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
135 // Verify ServerConfigurationPayload entry in leader's log
137 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
138 assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
139 assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
140 assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
141 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
143 // Verify ServerConfigurationPayload entry in both followers
145 assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
146 verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
148 assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
149 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
151 // Verify new server config was applied in both followers
153 assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID),
154 followerActorContext.getPeerAddresses().keySet());
156 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
157 newFollowerActorContext.getPeerAddresses().keySet());
159 clearMessages(followerActor);
160 clearMessages(newFollowerCollectorActor);
162 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
163 expectFirstMatching(followerActor, ApplyState.class);
165 assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
166 assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
167 assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
168 assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
172 public void testAddServerWithNoExistingFollower() throws Exception {
173 RaftActorContext initialActorContext = new MockRaftActorContext();
174 initialActorContext.setCommitIndex(1);
175 initialActorContext.setLastApplied(1);
176 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
179 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
180 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
181 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
182 actorFactory.generateActorId(LEADER_ID));
184 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
185 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
187 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
189 // Leader should install snapshot - capture and verify ApplySnapshot contents
191 ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
192 List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
193 assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
195 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
196 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
197 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
199 // Verify ServerConfigurationPayload entry in leader's log
201 assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
202 assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
203 assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
204 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
206 // Verify ServerConfigurationPayload entry in the new follower
208 clearMessages(newFollowerCollectorActor);
210 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
211 assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
212 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
214 // Verify new server config was applied in the new follower
216 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
217 newFollowerActorContext.getPeerAddresses().keySet());
221 public void testAddServerAsNonVoting() throws Exception {
222 RaftActorContext initialActorContext = new MockRaftActorContext();
223 initialActorContext.setCommitIndex(-1);
224 initialActorContext.setLastApplied(-1);
225 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
227 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
228 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
229 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
230 actorFactory.generateActorId(LEADER_ID));
232 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
233 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
235 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
237 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
238 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
239 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
241 // Verify ServerConfigurationPayload entry in leader's log
243 assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
244 assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
245 assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
246 verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
248 // Verify ServerConfigurationPayload entry in the new follower
250 expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
251 assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
252 verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
254 // Verify new server config was applied in the new follower
256 assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
257 newFollowerActorContext.getPeerAddresses().keySet());
259 MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500);
263 public void testAddServerWithInstallSnapshotTimeout() throws Exception {
264 newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
266 RaftActorContext initialActorContext = new MockRaftActorContext();
267 initialActorContext.setCommitIndex(-1);
268 initialActorContext.setLastApplied(-1);
269 initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
271 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
272 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
273 initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
274 actorFactory.generateActorId(LEADER_ID));
276 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
277 RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
279 leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
281 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
282 assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
284 assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size());
285 assertEquals("Leader followers size", 0,
286 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
290 public void testAddServerWithNoLeader() {
291 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
292 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
294 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
295 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
296 Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
297 actorFactory.generateActorId(LEADER_ID));
298 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
300 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
301 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
302 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
306 public void testAddServerForwardedToLeader() {
307 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
308 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
310 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
311 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
312 actorFactory.generateActorId(LEADER_ID));
314 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
315 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
316 Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
317 actorFactory.generateActorId(FOLLOWER_ID));
318 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
320 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
321 -1, -1, (short)0), leaderActor);
323 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
324 expectFirstMatching(leaderActor, AddServer.class);
327 private void verifyServerConfigurationPayloadEntry(ReplicatedLog log, String... cNew) {
328 ReplicatedLogEntry logEntry = log.get(log.lastIndex());
329 assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
330 ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
331 assertEquals("getNewServerConfig", Sets.newHashSet(cNew), Sets.newHashSet(payload.getNewServerConfig()));
334 private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
335 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
336 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
337 configParams.setElectionTimeoutFactor(100000);
338 ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
339 termInfo.update(1, LEADER_ID);
340 RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
341 id, termInfo, -1, -1,
342 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
344 return followerActorContext;
347 public static class MockLeaderRaftActor extends MockRaftActor {
348 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
349 RaftActorContext fromContext) {
350 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
352 RaftActorContext context = getRaftActorContext();
353 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
354 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
355 getState().add(entry.getData());
356 context.getReplicatedLog().append(entry);
359 context.setCommitIndex(fromContext.getCommitIndex());
360 context.setLastApplied(fromContext.getLastApplied());
361 context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
362 fromContext.getTermInformation().getVotedFor());
366 protected void initializeBehavior() {
367 changeCurrentBehavior(new Leader(getRaftActorContext()));
368 initializeBehaviorComplete.countDown();
372 public void createSnapshot(ActorRef actorRef) {
374 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
375 } catch (Exception e) {
376 LOG.error("createSnapshot failed", e);
380 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
381 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
382 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
383 configParams.setElectionTimeoutFactor(1);
384 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
388 public static class MockNewFollowerRaftActor extends MockRaftActor {
389 private final TestActorRef<MessageCollectorActor> collectorActor;
390 private volatile Class<?> dropMessageOfType;
392 public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
393 super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
394 this.collectorActor = collectorActor;
397 void setDropMessageOfType(Class<?> dropMessageOfType) {
398 this.dropMessageOfType = dropMessageOfType;
402 public void handleCommand(Object message) {
403 if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) {
407 super.handleCommand(message);
408 collectorActor.tell(message, getSender());
411 static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
412 return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);