Bug 2187: AddServer unit test and bug fixes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 //import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
35 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
37 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
38 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
39 import org.opendaylight.controller.cluster.raft.messages.AddServer;
40 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
44 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
47 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.duration.FiniteDuration;
52
53 /**
54  * Unit tests for RaftActorServerConfigurationSupport.
55  *
56  * @author Thomas Pantelis
57  */
58 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
59     static final String LEADER_ID = "leader";
60     static final String FOLLOWER_ID = "follower";
61     static final String NEW_SERVER_ID = "new-server";
62     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
63     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
64
65     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
66
67     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
68             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
69             actorFactory.generateActorId(FOLLOWER_ID));
70
71     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
72     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
73
74     private RaftActorContext newFollowerActorContext;
75     private final JavaTestKit testKit = new JavaTestKit(getSystem());
76
77     @Before
78     public void setup() {
79         InMemoryJournal.clear();
80         InMemorySnapshotStore.clear();
81
82         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
83         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
84         configParams.setElectionTimeoutFactor(100000);
85         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
86
87         newFollowerCollectorActor = actorFactory.createTestActor(
88                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
89                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
90         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
91                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
92                 actorFactory.generateActorId(NEW_SERVER_ID));
93         newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
94     }
95
96     @After
97     public void tearDown() throws Exception {
98         actorFactory.close();
99     }
100
101     @Test
102     public void testAddServerWithExistingFollower() throws Exception {
103         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
104         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
105                 0, 3, 1).build());
106         followerActorContext.setCommitIndex(2);
107         followerActorContext.setLastApplied(2);
108
109         Follower follower = new Follower(followerActorContext);
110         followerActor.underlyingActor().setBehavior(follower);
111
112         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
113                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
114                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
115                 actorFactory.generateActorId(LEADER_ID));
116
117         // Expect initial heartbeat from the leader.
118         expectFirstMatching(followerActor, AppendEntries.class);
119         clearMessages(followerActor);
120
121         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
122
123         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
124
125         // Leader should install snapshot - capture and verify ApplySnapshot contents
126
127         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
128         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
129         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
130
131         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
132         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
133         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
134
135         // Verify ServerConfigurationPayload entry in leader's log
136
137         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
138         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
139         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
140         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
141         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
142
143         // Verify ServerConfigurationPayload entry in both followers
144
145         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
146         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
147
148         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
149         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
150
151         // Verify new server config was applied in both followers
152
153         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID),
154                 followerActorContext.getPeerAddresses().keySet());
155
156         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
157                 newFollowerActorContext.getPeerAddresses().keySet());
158
159         clearMessages(followerActor);
160         clearMessages(newFollowerCollectorActor);
161
162         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
163         expectFirstMatching(followerActor, ApplyState.class);
164
165         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
166         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
167         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
168         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
169     }
170
171     @Test
172     public void testAddServerWithNoExistingFollower() throws Exception {
173         RaftActorContext initialActorContext = new MockRaftActorContext();
174         initialActorContext.setCommitIndex(1);
175         initialActorContext.setLastApplied(1);
176         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
177                 0, 2, 1).build());
178
179         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
180                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
181                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
182                 actorFactory.generateActorId(LEADER_ID));
183
184         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
185         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
186
187         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
188
189         // Leader should install snapshot - capture and verify ApplySnapshot contents
190
191         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
192         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
193         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
194
195         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
196         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
197         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
198
199         // Verify ServerConfigurationPayload entry in leader's log
200
201         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
202         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
203         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
204         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
205
206         // Verify ServerConfigurationPayload entry in the new follower
207
208         clearMessages(newFollowerCollectorActor);
209
210         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
211         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
212         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
213
214         // Verify new server config was applied in the new follower
215
216         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
217                 newFollowerActorContext.getPeerAddresses().keySet());
218     }
219
220     @Test
221     public void testAddServerAsNonVoting() throws Exception {
222         RaftActorContext initialActorContext = new MockRaftActorContext();
223         initialActorContext.setCommitIndex(-1);
224         initialActorContext.setLastApplied(-1);
225         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
226
227         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
228                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
229                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
230                 actorFactory.generateActorId(LEADER_ID));
231
232         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
233         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
234
235         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
236
237         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
238         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
239         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
240
241         // Verify ServerConfigurationPayload entry in leader's log
242
243         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
244         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
245         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
246         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
247
248         // Verify ServerConfigurationPayload entry in the new follower
249
250         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
251         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
252         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
253
254         // Verify new server config was applied in the new follower
255
256         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
257                 newFollowerActorContext.getPeerAddresses().keySet());
258
259         MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500);
260     }
261
262     @Test
263     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
264         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
265
266         RaftActorContext initialActorContext = new MockRaftActorContext();
267         initialActorContext.setCommitIndex(-1);
268         initialActorContext.setLastApplied(-1);
269         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
270
271         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
272                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
273                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
274                 actorFactory.generateActorId(LEADER_ID));
275
276         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
277         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
278
279         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
280
281         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
282         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
283
284         assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size());
285         assertEquals("Leader followers size", 0,
286                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
287     }
288
289     @Test
290     public void testAddServerWithNoLeader() {
291         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
292         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
293
294         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
295                 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
296                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
297                 actorFactory.generateActorId(LEADER_ID));
298         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
299
300         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
301         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
302         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
303     }
304
305     @Test
306     public void testAddServerForwardedToLeader() {
307         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
308         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
309
310         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
311                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
312                 actorFactory.generateActorId(LEADER_ID));
313
314         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
315                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
316                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
317                 actorFactory.generateActorId(FOLLOWER_ID));
318         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
319
320         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
321                 -1, -1, (short)0), leaderActor);
322
323         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
324         expectFirstMatching(leaderActor, AddServer.class);
325     }
326
327     private void verifyServerConfigurationPayloadEntry(ReplicatedLog log, String... cNew) {
328         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
329         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
330         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
331         assertEquals("getNewServerConfig", Sets.newHashSet(cNew), Sets.newHashSet(payload.getNewServerConfig()));
332     }
333
334     private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
335         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
336         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
337         configParams.setElectionTimeoutFactor(100000);
338         ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
339         termInfo.update(1, LEADER_ID);
340         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
341                 id, termInfo, -1, -1,
342                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
343
344         return followerActorContext;
345     }
346
347     public static class MockLeaderRaftActor extends MockRaftActor {
348         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
349                 RaftActorContext fromContext) {
350             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
351
352             RaftActorContext context = getRaftActorContext();
353             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
354                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
355                 getState().add(entry.getData());
356                 context.getReplicatedLog().append(entry);
357             }
358
359             context.setCommitIndex(fromContext.getCommitIndex());
360             context.setLastApplied(fromContext.getLastApplied());
361             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
362                     fromContext.getTermInformation().getVotedFor());
363         }
364
365         @Override
366         protected void initializeBehavior() {
367             changeCurrentBehavior(new Leader(getRaftActorContext()));
368             initializeBehaviorComplete.countDown();
369         }
370
371         @Override
372         public void createSnapshot(ActorRef actorRef) {
373             try {
374                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
375             } catch (Exception e) {
376                 LOG.error("createSnapshot failed", e);
377             }
378         }
379
380         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
381             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
382             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
383             configParams.setElectionTimeoutFactor(1);
384             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
385         }
386     }
387
388     public static class MockNewFollowerRaftActor extends MockRaftActor {
389         private final TestActorRef<MessageCollectorActor> collectorActor;
390         private volatile Class<?> dropMessageOfType;
391
392         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
393             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
394             this.collectorActor = collectorActor;
395         }
396
397         void setDropMessageOfType(Class<?> dropMessageOfType) {
398             this.dropMessageOfType = dropMessageOfType;
399         }
400
401         @Override
402         public void handleCommand(Object message) {
403             if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) {
404                 return;
405             }
406
407             super.handleCommand(message);
408             collectorActor.tell(message, getSender());
409         }
410
411         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
412             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
413         }
414     }
415 }