348b3d9acdd2e260de06edb222ddcf971ec609e9
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeaderElectionScenarioTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Props;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.After;
25 import org.junit.Before;
26 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
27 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.TestActorFactory;
30 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.duration.FiniteDuration;
36
37 /**
38  * Abstract base for a leader election scenario test.
39  *
40  * @author Thomas Pantelis
41  */
42 public class AbstractLeaderElectionScenarioTest {
43     static final int HEARTBEAT_INTERVAL = 50;
44
45     static class MemberActor extends MessageCollectorActor {
46
47         volatile RaftActorBehavior behavior;
48         Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
49         Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
50         CountDownLatch behaviorStateChangeLatch;
51
52         public static Props props() {
53             return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId());
54         }
55
56         @Override
57         public void onReceive(Object message) throws Exception {
58             // Ignore scheduled SendHeartBeat messages.
59             if(message instanceof SendHeartBeat) {
60                 return;
61             }
62
63             try {
64                 if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
65                     final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
66                     if (nextBehavior != null) {
67                         RaftActorBehavior oldBehavior = behavior;
68                         behavior = nextBehavior;
69                         if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
70                             behaviorStateChangeLatch.countDown();
71                         }
72                     }
73                 }
74             } finally {
75                 super.onReceive(message);
76
77                 CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
78                 if(latch != null) {
79                     latch.countDown();
80                 }
81             }
82         }
83
84         void expectBehaviorStateChange() {
85             behaviorStateChangeLatch = new CountDownLatch(1);
86         }
87
88         void waitForBehaviorStateChange() {
89             assertTrue("Expected behavior state change",
90                     Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
91         }
92
93         void expectMessageClass(Class<?> expClass, int expCount) {
94             messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
95         }
96
97         void waitForExpectedMessages(Class<?> expClass) {
98             CountDownLatch latch = messagesReceivedLatches.get(expClass);
99             assertNotNull("No messages received for " + expClass, latch);
100             assertTrue("Missing messages of type " + expClass,
101                     Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
102         }
103
104         void dropMessagesToBehavior(Class<?> msgClass) {
105             dropMessagesToBehavior(msgClass, 1);
106         }
107
108         void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
109             expectMessageClass(msgClass, expCount);
110             dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
111         }
112
113         void clearDropMessagesToBehavior() {
114             dropMessagesToBehavior.clear();
115         }
116
117         @Override
118         public void clear() {
119             behaviorStateChangeLatch = null;
120             clearDropMessagesToBehavior();
121             messagesReceivedLatches.clear();
122             super.clear();
123         }
124
125         void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
126             Object message = getFirstMatching(getSelf(), msgClass);
127             assertNotNull("Message of type " + msgClass + " not received", message);
128             getSelf().tell(message, sender);
129         }
130
131         void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
132             for(Object m: getAllMatching(getSelf(), msgClass)) {
133                 getSelf().tell(m, sender);
134             }
135         }
136
137         <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
138             T message = getFirstMatching(getSelf(), msgClass);
139             assertNotNull("Message of type " + msgClass + " not received", message);
140             return message;
141         }
142     }
143
144     protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
145     protected final ActorSystem system = ActorSystem.create("test");
146     protected final TestActorFactory factory = new TestActorFactory(system);
147     protected TestActorRef<MemberActor> member1ActorRef;
148     protected TestActorRef<MemberActor> member2ActorRef;
149     protected TestActorRef<MemberActor> member3ActorRef;
150     protected MemberActor member1Actor;
151     protected MemberActor member2Actor;
152     protected MemberActor member3Actor;
153     protected MockRaftActorContext member1Context;
154     protected MockRaftActorContext member2Context;
155     protected MockRaftActorContext member3Context;
156
157     @Before
158     public void setup() throws Exception {
159         member1ActorRef = newMemberActor("member1");
160         member2ActorRef = newMemberActor("member2");
161         member3ActorRef = newMemberActor("member3");
162
163         member1Actor = member1ActorRef.underlyingActor();
164         member2Actor = member2ActorRef.underlyingActor();
165         member3Actor = member3ActorRef.underlyingActor();
166     }
167
168     @After
169     public void tearDown() throws Exception {
170
171         if (member1Actor.behavior != null) {
172             member1Actor.behavior.close();
173         }
174         if (member2Actor.behavior != null) {
175             member2Actor.behavior.close();
176         }
177         if (member3Actor.behavior != null) {
178             member3Actor.behavior.close();
179         }
180
181         JavaTestKit.shutdownActorSystem(system);
182     }
183
184     DefaultConfigParamsImpl newConfigParams() {
185         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
186         configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
187         configParams.setElectionTimeoutFactor(100000);
188         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
189         return configParams;
190     }
191
192     MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
193             Map<String, String> peerAddresses) {
194         MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
195         context.setPeerAddresses(peerAddresses);
196         context.getTermInformation().updateAndPersist(1, "");
197         return context;
198     }
199
200     void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
201         assertEquals(name + " behavior state", expState, actor.behavior.state());
202     }
203
204     void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers) throws Exception {
205         // Leader sends immediate heartbeats - we don't care about it so ignore it.
206         // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
207         // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
208         // a workaround.
209
210         Leader leader = null;
211         AssertionError lastAssertError = null;
212         for(int i = 1; i <= 3; i++) {
213             actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
214
215             leader = new Leader(context);
216             try {
217                 actor.waitForExpectedMessages(AppendEntriesReply.class);
218                 lastAssertError = null;
219                 break;
220             } catch (AssertionError e) {
221                 lastAssertError = e;
222             }
223         }
224
225         if(lastAssertError != null) {
226             throw lastAssertError;
227         }
228
229         context.setCurrentBehavior(leader);
230
231         // Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior.
232         actor.behavior = leader;
233
234         actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
235         actor.clear();
236
237     }
238
239     TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
240         TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props().
241                 withDispatcher(Dispatchers.DefaultDispatcherId()), name);
242         MessageCollectorActor.waitUntilReady(actor);
243         return actor;
244     }
245
246     void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
247         Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
248         leaderActor.underlyingActor().behavior.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
249     }
250 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.