BUG-2187: Add Server - Leader Implementation
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 //import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Maps;
22 import java.util.Collections;
23 import java.util.Map;
24 //import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.After;
27 import org.junit.Before;
28 import org.junit.Test;
29 import org.opendaylight.controller.cluster.DataPersistenceProvider;
30 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
31 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
32 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
33 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
34 import org.opendaylight.controller.cluster.raft.messages.AddServer;
35 //import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
36 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
37 //import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
38 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
39 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import scala.concurrent.duration.FiniteDuration;
44 //import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
45 /**
46  * Unit tests for RaftActorServerConfigurationSupport.
47  *
48  * @author Thomas Pantelis
49  */
50 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
51     static final String LEADER_ID = "leader";
52     static final String FOLLOWER_ID = "follower";
53     static final String NEW_SERVER_ID = "new-server";
54     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
55     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
56
57     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
58
59     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
60             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
61             actorFactory.generateActorId(FOLLOWER_ID));
62
63     private final TestActorRef<ForwardMessageToBehaviorActor> newServerActor = actorFactory.createTestActor(
64             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
65             actorFactory.generateActorId(NEW_SERVER_ID));
66
67     private RaftActorContext newServerActorContext;
68     private final JavaTestKit testKit = new JavaTestKit(getSystem());
69
70     @Before
71     public void setup() {
72         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
73         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
74         configParams.setElectionTimeoutFactor(100000);
75         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
76         newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(),
77                 NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1,
78                 Maps.<String, String>newHashMap(), configParams, NO_PERSISTENCE, LOG);
79         newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
80     }
81
82     @After
83     public void tearDown() throws Exception {
84         actorFactory.close();
85     }
86
87     @Test
88     public void testAddServerWithFollower() throws Exception {
89         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
90         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
91                 0, 3, 1).build());
92         followerActorContext.setCommitIndex(3);
93         followerActorContext.setLastApplied(3);
94
95         Follower follower = new Follower(followerActorContext);
96         followerActor.underlyingActor().setBehavior(follower);
97
98         Follower newServer = new Follower(newServerActorContext);
99         newServerActor.underlyingActor().setBehavior(newServer);
100
101         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
102                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
103                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
104                 actorFactory.generateActorId(LEADER_ID));
105
106         // Expect initial heartbeat from the leader.
107         expectFirstMatching(followerActor, AppendEntries.class);
108         clearMessages(followerActor);
109
110         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
111
112         leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
113
114         // leader should install snapshot - capture and verify ApplySnapshot contents
115         //ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class);
116         //List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
117         //assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
118
119         // leader should replicate new server config to both followers
120         //expectFirstMatching(followerActor, AppendEntries.class);
121         //expectFirstMatching(newServerActor, AppendEntries.class);
122
123         // verify ServerConfigurationPayload entry in leader's log
124         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
125         //assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size());
126         //assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
127         ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get(
128                 leaderActorContext.getReplicatedLog().lastIndex());
129         // verify logEntry contents
130
131         // Also verify ServerConfigurationPayload entry in both followers
132
133         //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
134         //assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
135         //assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
136     }
137
138     //@Test
139     public void testAddServerWithNoLeader() {
140         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
141         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
142
143         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
144                 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
145                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
146                 actorFactory.generateActorId(LEADER_ID));
147         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
148
149         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
150         //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
151         //assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
152     }
153
154     //@Test
155     public void testAddServerForwardedToLeader() {
156         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
157         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
158
159         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
160                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
161                 actorFactory.generateActorId(LEADER_ID));
162
163         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
164                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
165                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
166                 actorFactory.generateActorId(FOLLOWER_ID));
167         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
168
169         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
170                 -1, -1, (short)0), leaderActor);
171
172         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
173         //expectFirstMatching(leaderActor, AddServer.class);
174     }
175
176     private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
177         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
178         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
179         configParams.setElectionTimeoutFactor(100000);
180         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
181                 id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1,
182                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
183
184         return followerActorContext;
185     }
186
187     public static class MockLeaderRaftActor extends MockRaftActor {
188         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
189                 RaftActorContext fromContext) {
190             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
191
192             RaftActorContext context = getRaftActorContext();
193             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
194                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
195                 getState().add(entry.getData());
196                 context.getReplicatedLog().append(entry);
197             }
198
199             context.setCommitIndex(fromContext.getCommitIndex());
200             context.setLastApplied(fromContext.getLastApplied());
201         }
202
203         @Override
204         protected void initializeBehavior() {
205             changeCurrentBehavior(new Leader(getRaftActorContext()));
206             initializeBehaviorComplete.countDown();
207         }
208
209         @Override
210         public void createSnapshot(ActorRef actorRef) {
211             try {
212                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
213             } catch (Exception e) {
214                 LOG.error("createSnapshot failed", e);
215             }
216         }
217
218         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
219             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
220             configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
221             configParams.setElectionTimeoutFactor(100000);
222             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
223         }
224     }
225 }