Fix intermittent PreLeaderScenarioTest failure
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / utils / MessageCollectorActor.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.utils;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.dispatch.ControlMessage;
15 import akka.pattern.Patterns;
16 import akka.util.Timeout;
17 import com.google.common.base.Predicate;
18 import com.google.common.base.Predicates;
19 import com.google.common.base.Throwables;
20 import com.google.common.collect.Iterables;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import org.junit.Assert;
28 import scala.concurrent.Await;
29 import scala.concurrent.Future;
30 import scala.concurrent.duration.Duration;
31 import scala.concurrent.duration.FiniteDuration;
32
33 public class MessageCollectorActor extends UntypedActor {
34     private static final String ARE_YOU_READY = "ARE_YOU_READY";
35     public static final String GET_ALL_MESSAGES = "messages";
36
37     private static final Object CLEAR_MESSAGES = new ControlMessage() {
38         @Override
39         public String toString() {
40             return "clear-messages";
41         }
42     };
43
44     private final List<Object> messages = new ArrayList<>();
45
46     @Override public void onReceive(final Object message) throws Exception {
47         if (ARE_YOU_READY.equals(message)) {
48             getSender().tell("yes", getSelf());
49         } else if (GET_ALL_MESSAGES.equals(message)) {
50             getSender().tell(new ArrayList<>(messages), getSelf());
51         } else if (CLEAR_MESSAGES.equals(message)) {
52             messages.clear();
53         } else if (message != null) {
54             messages.add(message);
55         }
56     }
57
58     @SuppressWarnings({"unchecked", "checkstyle:illegalCatch"})
59     public static List<Object> getAllMessages(final ActorRef actor) {
60         FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
61         Timeout operationTimeout = new Timeout(operationDuration);
62         Future<Object> future = Patterns.ask(actor, GET_ALL_MESSAGES, operationTimeout);
63
64         try {
65             return (List<Object>) Await.result(future, operationDuration);
66         } catch (RuntimeException e) {
67             throw e;
68         } catch (Exception e) {
69             throw new RuntimeException(e);
70         }
71     }
72
73     public static void clearMessages(final ActorRef actor) {
74         actor.tell(CLEAR_MESSAGES, ActorRef.noSender());
75     }
76
77     /**
78      * Get the first message that matches the specified class.
79      *
80      * @param actor the MessageCollectorActor reference
81      * @param clazz the class to match
82      * @return the first matching message
83      */
84     public static <T> T getFirstMatching(final ActorRef actor, final Class<T> clazz) {
85         List<Object> allMessages = getAllMessages(actor);
86
87         for (Object message : allMessages) {
88             if (message.getClass().equals(clazz)) {
89                 return clazz.cast(message);
90             }
91         }
92
93         return null;
94     }
95
96     @SuppressWarnings("checkstyle:IllegalCatch")
97     public static <T> List<T> expectMatching(final ActorRef actor, final Class<T> clazz, final int count) {
98         return expectMatching(actor, clazz, count, msg -> true);
99     }
100
101     @SuppressWarnings("checkstyle:IllegalCatch")
102     public static <T> List<T> expectMatching(final ActorRef actor, final Class<T> clazz, final int count,
103             final Predicate<T> matcher) {
104         int timeout = 5000;
105         Exception lastEx = null;
106         List<T> messages = Collections.emptyList();
107         for (int i = 0; i < timeout / 50; i++) {
108             try {
109                 messages = getAllMatching(actor, clazz);
110                 Iterables.removeIf(messages, Predicates.not(matcher));
111                 if (messages.size() >= count) {
112                     return messages;
113                 }
114
115                 lastEx = null;
116             } catch (Exception e)  {
117                 lastEx = e;
118             }
119
120             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
121         }
122
123         throw new AssertionError(String.format("Expected %d messages of type %s. Actual received was %d: %s", count,
124                 clazz, messages.size(), messages), lastEx);
125     }
126
127     public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz) {
128         return expectFirstMatching(actor, clazz, 5000);
129     }
130
131     @SuppressWarnings("checkstyle:IllegalCatch")
132     public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz, final long timeout) {
133         Exception lastEx = null;
134         int count = (int) (timeout / 50);
135         for (int i = 0; i < count; i++) {
136             try {
137                 T message = getFirstMatching(actor, clazz);
138                 if (message != null) {
139                     return message;
140                 }
141
142                 lastEx = null;
143             } catch (Exception e) {
144                 lastEx = e;
145             }
146
147             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
148         }
149
150         throw new AssertionError(actor + ": Did not receive message of type " + clazz + ", Actual received was "
151             + getAllMessages(actor), lastEx);
152     }
153
154     @SuppressWarnings("checkstyle:IllegalCatch")
155     public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz, final Predicate<T> matcher) {
156         int timeout = 5000;
157         Exception lastEx = null;
158         T lastMessage = null;
159         for (int i = 0; i < timeout / 50; i++) {
160             try {
161                 List<T> messages = getAllMatching(actor, clazz);
162                 for (T msg : messages) {
163                     if (matcher.apply(msg)) {
164                         return msg;
165                     }
166
167                     lastMessage = msg;
168                 }
169
170                 lastEx = null;
171             } catch (Exception e) {
172                 lastEx = e;
173             }
174
175             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
176         }
177
178         throw new AssertionError(String.format("Expected specific message of type %s. Last message received was: %s",
179                 clazz, lastMessage), lastEx);
180     }
181
182     public static <T> void assertNoneMatching(final ActorRef actor, final Class<T> clazz) {
183         assertNoneMatching(actor, clazz, 5000);
184     }
185
186     @SuppressWarnings("checkstyle:IllegalCatch")
187     public static <T> void assertNoneMatching(final ActorRef actor, final Class<T> clazz, final long timeout) {
188         Exception lastEx = null;
189         int count = (int) (timeout / 50);
190         for (int i = 0; i < count; i++) {
191             try {
192                 T message = getFirstMatching(actor, clazz);
193                 if (message != null) {
194                     Assert.fail("Unexpected message received" +  message.toString());
195                     return;
196                 }
197
198                 lastEx = null;
199             } catch (Exception e) {
200                 lastEx = e;
201             }
202
203             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
204         }
205
206         if (lastEx != null) {
207             Throwables.throwIfUnchecked(lastEx);
208             throw new RuntimeException(lastEx);
209         }
210
211         return;
212     }
213
214
215     public static <T> List<T> getAllMatching(final ActorRef actor, final Class<T> clazz) {
216         List<Object> allMessages = getAllMessages(actor);
217
218         List<T> output = new ArrayList<>();
219
220         for (Object message : allMessages) {
221             if (message.getClass().equals(clazz)) {
222                 output.add(clazz.cast(message));
223             }
224         }
225
226         return output;
227     }
228
229     public static void waitUntilReady(final ActorRef actor) throws TimeoutException, InterruptedException {
230         long timeout = 500;
231         FiniteDuration duration = Duration.create(timeout, TimeUnit.MILLISECONDS);
232         for (int i = 0; i < 10; i++) {
233             try {
234                 Await.ready(Patterns.ask(actor, ARE_YOU_READY, timeout), duration);
235                 return;
236             } catch (TimeoutException e) {
237                 // will fall through below
238             }
239         }
240
241         throw new TimeoutException("Actor not ready in time.");
242     }
243
244     public static Props props() {
245         return Props.create(MessageCollectorActor.class);
246     }
247 }