2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Maps;
22 import java.util.Collections;
24 import java.util.concurrent.TimeUnit;
25 import org.junit.After;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.opendaylight.controller.cluster.DataPersistenceProvider;
29 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
32 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
33 import org.opendaylight.controller.cluster.raft.messages.AddServer;
34 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
35 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
36 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
37 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
38 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.duration.FiniteDuration;
45 * Unit tests for RaftActorServerConfigurationSupport.
47 * @author Thomas Pantelis
49 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
50 static final String LEADER_ID = "leader";
51 static final String FOLLOWER_ID = "follower";
52 static final String NEW_SERVER_ID = "new-server";
53 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
54 private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
56 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
58 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
59 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
60 actorFactory.generateActorId(FOLLOWER_ID));
62 private final TestActorRef<ForwardMessageToBehaviorActor> newServerActor = actorFactory.createTestActor(
63 Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
64 actorFactory.generateActorId(NEW_SERVER_ID));
66 private RaftActorContext newServerActorContext;
67 private final JavaTestKit testKit = new JavaTestKit(getSystem());
71 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
72 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
73 configParams.setElectionTimeoutFactor(100000);
74 configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
75 newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(),
76 NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1,
77 Maps.<String, String>newHashMap(), configParams, NO_PERSISTENCE, LOG);
78 newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
82 public void tearDown() throws Exception {
87 public void testAddServerWithFollower() throws Exception {
88 RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
89 followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
91 followerActorContext.setCommitIndex(3);
92 followerActorContext.setLastApplied(3);
94 Follower follower = new Follower(followerActorContext);
95 followerActor.underlyingActor().setBehavior(follower);
97 Follower newServer = new Follower(newServerActorContext);
98 newServerActor.underlyingActor().setBehavior(newServer);
100 TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
101 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
102 followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
103 actorFactory.generateActorId(LEADER_ID));
105 // Expect initial heartbeat from the leader.
106 expectFirstMatching(followerActor, AppendEntries.class);
107 clearMessages(followerActor);
109 MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
111 leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
113 // leader should install snapshot - capture and verify ApplySnapshot contents
114 // ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class);
115 // List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
116 // assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
118 // leader should replicate new server config to both followers
119 // expectFirstMatching(followerActor, AppendEntries.class);
120 // expectFirstMatching(newServerActor, AppendEntries.class);
122 // verify ServerConfigurationPayload entry in leader's log
123 // RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
124 // assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size());
125 // assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
126 // ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get(
127 // leaderActorContext.getReplicatedLog().lastIndex());
128 // verify logEntry contents
130 // Also verify ServerConfigurationPayload entry in both followers
132 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
133 assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
134 assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
138 public void testAddServerWithNoLeader() {
139 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
140 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
142 TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
143 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
144 Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
145 actorFactory.generateActorId(LEADER_ID));
146 noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
148 noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
149 AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
150 assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
154 public void testAddServerForwardedToLeader() {
155 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
156 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
158 TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
159 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
160 actorFactory.generateActorId(LEADER_ID));
162 TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
163 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
164 Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
165 actorFactory.generateActorId(FOLLOWER_ID));
166 followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
168 followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
169 -1, -1, (short)0), leaderActor);
171 followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
172 expectFirstMatching(leaderActor, AddServer.class);
175 private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
176 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
177 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
178 configParams.setElectionTimeoutFactor(100000);
179 RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
180 id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1,
181 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
183 return followerActorContext;
186 public static class MockLeaderRaftActor extends MockRaftActor {
187 public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
188 RaftActorContext fromContext) {
189 super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
191 RaftActorContext context = getRaftActorContext();
192 for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
193 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
194 getState().add(entry.getData());
195 context.getReplicatedLog().append(entry);
198 context.setCommitIndex(fromContext.getCommitIndex());
199 context.setLastApplied(fromContext.getLastApplied());
203 protected void initializeBehavior() {
204 changeCurrentBehavior(new Leader(getRaftActorContext()));
205 initializeBehaviorComplete.countDown();
209 public void createSnapshot(ActorRef actorRef) {
211 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
212 } catch (Exception e) {
213 LOG.error("createSnapshot failed", e);
217 static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
218 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
219 configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
220 configParams.setElectionTimeoutFactor(100000);
221 return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);