Merge "BUG-2288: deprecate old binding Notification API elements"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeaderElectionScenarioTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Props;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.After;
25 import org.junit.Before;
26 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
27 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftActorContext;
29 import org.opendaylight.controller.cluster.raft.RaftState;
30 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.duration.FiniteDuration;
36
37 /**
38  * Abstract base for a leader election scenario test.
39  *
40  * @author Thomas Pantelis
41  */
42 public class AbstractLeaderElectionScenarioTest {
43     static final int HEARTBEAT_INTERVAL = 50;
44
45     static class MemberActor extends MessageCollectorActor {
46
47         volatile RaftActorBehavior behavior;
48         Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
49         Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
50         CountDownLatch behaviorStateChangeLatch;
51
52         public static Props props() {
53             return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId());
54         }
55
56         @Override
57         public void onReceive(Object message) throws Exception {
58             // Ignore scheduled SendHeartBeat messages.
59             if(message instanceof SendHeartBeat) {
60                 return;
61             }
62
63             try {
64                 if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
65                     RaftActorBehavior oldBehavior = behavior;
66                     behavior = behavior.handleMessage(getSender(), message);
67                     if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
68                         behaviorStateChangeLatch.countDown();
69                     }
70                 }
71             } finally {
72                 super.onReceive(message);
73
74                 CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
75                 if(latch != null) {
76                     latch.countDown();
77                 }
78             }
79         }
80
81         void expectBehaviorStateChange() {
82             behaviorStateChangeLatch = new CountDownLatch(1);
83         }
84
85         void waitForBehaviorStateChange() {
86             assertTrue("Expected behavior state change",
87                     Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
88         }
89
90         void expectMessageClass(Class<?> expClass, int expCount) {
91             messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
92         }
93
94         void waitForExpectedMessages(Class<?> expClass) {
95             CountDownLatch latch = messagesReceivedLatches.get(expClass);
96             assertNotNull("No messages received for " + expClass, latch);
97             assertTrue("Missing messages of type " + expClass,
98                     Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
99         }
100
101         void dropMessagesToBehavior(Class<?> msgClass) {
102             dropMessagesToBehavior(msgClass, 1);
103         }
104
105         void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
106             expectMessageClass(msgClass, expCount);
107             dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
108         }
109
110         void clearDropMessagesToBehavior() {
111             dropMessagesToBehavior.clear();
112         }
113
114         @Override
115         public void clear() {
116             behaviorStateChangeLatch = null;
117             clearDropMessagesToBehavior();
118             messagesReceivedLatches.clear();
119             super.clear();
120         }
121
122         void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
123             Object message = getFirstMatching(getSelf(), msgClass);
124             assertNotNull("Message of type " + msgClass + " not received", message);
125             getSelf().tell(message, sender);
126         }
127
128         void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
129             for(Object m: getAllMatching(getSelf(), msgClass)) {
130                 getSelf().tell(m, sender);
131             }
132         }
133
134         <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
135             Object message = getFirstMatching(getSelf(), msgClass);
136             assertNotNull("Message of type " + msgClass + " not received", message);
137             return (T) message;
138         }
139     }
140
141     protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
142     protected final ActorSystem system = ActorSystem.create("test");
143     protected TestActorRef<MemberActor> member1ActorRef;
144     protected TestActorRef<MemberActor> member2ActorRef;
145     protected TestActorRef<MemberActor> member3ActorRef;
146     protected MemberActor member1Actor;
147     protected MemberActor member2Actor;
148     protected MemberActor member3Actor;
149     protected MockRaftActorContext member1Context;
150     protected MockRaftActorContext member2Context;
151     protected MockRaftActorContext member3Context;
152
153     @Before
154     public void setup() throws Exception {
155         member1ActorRef = newMemberActor("member1");
156         member2ActorRef = newMemberActor("member2");
157         member3ActorRef = newMemberActor("member3");
158
159         member1Actor = member1ActorRef.underlyingActor();
160         member2Actor = member2ActorRef.underlyingActor();
161         member3Actor = member3ActorRef.underlyingActor();
162     }
163
164     @After
165     public void tearDown() {
166         JavaTestKit.shutdownActorSystem(system);
167     }
168
169     DefaultConfigParamsImpl newConfigParams() {
170         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
171         configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
172         configParams.setElectionTimeoutFactor(100000);
173         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
174         return configParams;
175     }
176
177     MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
178             Map<String, String> peerAddresses) {
179         MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
180         context.setPeerAddresses(peerAddresses);
181         context.getTermInformation().updateAndPersist(1, "");
182         return context;
183     }
184
185     void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
186         assertEquals(name + " behavior state", expState, actor.behavior.state());
187     }
188
189     void initializeLeaderBehavior(MemberActor actor, RaftActorContext context,
190             int numActiveFollowers) throws Exception {
191         // Leader sends immediate heartbeats - we don't care about it so ignore it.
192
193         actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
194         Leader leader = new Leader(context);
195         actor.waitForExpectedMessages(AppendEntriesReply.class);
196         actor.behavior = leader;
197
198         actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
199         actor.clear();
200     }
201
202     TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
203         TestActorRef<MemberActor> actor = TestActorRef.create(system, MemberActor.props(), name);
204         MessageCollectorActor.waitUntilReady(actor);
205         return actor;
206     }
207
208     void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
209         Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
210         leaderActor.underlyingActor().behavior.handleMessage(leaderActor, new SendHeartBeat());
211     }
212 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.