Reduce use of scala.concurrent.duration.Duration
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / utils / MessageCollectorActor.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.utils;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.UntypedAbstractActor;
13 import akka.dispatch.ControlMessage;
14 import akka.pattern.Patterns;
15 import akka.util.Timeout;
16 import com.google.common.base.Predicate;
17 import com.google.common.base.Predicates;
18 import com.google.common.base.Throwables;
19 import com.google.common.collect.Iterables;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import org.junit.Assert;
27 import scala.concurrent.Await;
28 import scala.concurrent.Future;
29 import scala.concurrent.duration.FiniteDuration;
30
31 public class MessageCollectorActor extends UntypedAbstractActor {
32     private static final String ARE_YOU_READY = "ARE_YOU_READY";
33     public static final String GET_ALL_MESSAGES = "messages";
34
35     private static final Object CLEAR_MESSAGES = new ControlMessage() {
36         @Override
37         public String toString() {
38             return "clear-messages";
39         }
40     };
41
42     private final List<Object> messages = new ArrayList<>();
43
44     @Override public void onReceive(final Object message) throws Exception {
45         if (ARE_YOU_READY.equals(message)) {
46             getSender().tell("yes", getSelf());
47         } else if (GET_ALL_MESSAGES.equals(message)) {
48             getSender().tell(new ArrayList<>(messages), getSelf());
49         } else if (CLEAR_MESSAGES.equals(message)) {
50             messages.clear();
51         } else if (message != null) {
52             messages.add(message);
53         }
54     }
55
56     @SuppressWarnings({"unchecked", "checkstyle:illegalCatch"})
57     public static List<Object> getAllMessages(final ActorRef actor) {
58         FiniteDuration operationDuration = FiniteDuration.create(5, TimeUnit.SECONDS);
59         Timeout operationTimeout = new Timeout(operationDuration);
60         Future<Object> future = Patterns.ask(actor, GET_ALL_MESSAGES, operationTimeout);
61
62         try {
63             return (List<Object>) Await.result(future, operationDuration);
64         } catch (RuntimeException e) {
65             throw e;
66         } catch (Exception e) {
67             throw new RuntimeException(e);
68         }
69     }
70
71     public static void clearMessages(final ActorRef actor) {
72         actor.tell(CLEAR_MESSAGES, ActorRef.noSender());
73     }
74
75     /**
76      * Get the first message that matches the specified class.
77      *
78      * @param actor the MessageCollectorActor reference
79      * @param clazz the class to match
80      * @return the first matching message
81      */
82     public static <T> T getFirstMatching(final ActorRef actor, final Class<T> clazz) {
83         List<Object> allMessages = getAllMessages(actor);
84
85         for (Object message : allMessages) {
86             if (message.getClass().equals(clazz)) {
87                 return clazz.cast(message);
88             }
89         }
90
91         return null;
92     }
93
94     @SuppressWarnings("checkstyle:IllegalCatch")
95     public static <T> List<T> expectMatching(final ActorRef actor, final Class<T> clazz, final int count) {
96         return expectMatching(actor, clazz, count, msg -> true);
97     }
98
99     @SuppressWarnings("checkstyle:IllegalCatch")
100     public static <T> List<T> expectMatching(final ActorRef actor, final Class<T> clazz, final int count,
101             final Predicate<T> matcher) {
102         int timeout = 5000;
103         Exception lastEx = null;
104         List<T> messages = Collections.emptyList();
105         for (int i = 0; i < timeout / 50; i++) {
106             try {
107                 messages = getAllMatching(actor, clazz);
108                 Iterables.removeIf(messages, Predicates.not(matcher));
109                 if (messages.size() >= count) {
110                     return messages;
111                 }
112
113                 lastEx = null;
114             } catch (Exception e)  {
115                 lastEx = e;
116             }
117
118             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
119         }
120
121         throw new AssertionError(String.format("Expected %d messages of type %s. Actual received was %d: %s", count,
122                 clazz, messages.size(), messages), lastEx);
123     }
124
125     public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz) {
126         return expectFirstMatching(actor, clazz, 5000);
127     }
128
129     @SuppressWarnings("checkstyle:IllegalCatch")
130     public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz, final long timeout) {
131         Exception lastEx = null;
132         int count = (int) (timeout / 50);
133         for (int i = 0; i < count; i++) {
134             try {
135                 T message = getFirstMatching(actor, clazz);
136                 if (message != null) {
137                     return message;
138                 }
139
140                 lastEx = null;
141             } catch (Exception e) {
142                 lastEx = e;
143             }
144
145             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
146         }
147
148         throw new AssertionError(actor + ": Did not receive message of type " + clazz + ", Actual received was "
149             + getAllMessages(actor), lastEx);
150     }
151
152     @SuppressWarnings("checkstyle:IllegalCatch")
153     public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz, final Predicate<T> matcher) {
154         int timeout = 5000;
155         Exception lastEx = null;
156         T lastMessage = null;
157         for (int i = 0; i < timeout / 50; i++) {
158             try {
159                 List<T> messages = getAllMatching(actor, clazz);
160                 for (T msg : messages) {
161                     if (matcher.apply(msg)) {
162                         return msg;
163                     }
164
165                     lastMessage = msg;
166                 }
167
168                 lastEx = null;
169             } catch (Exception e) {
170                 lastEx = e;
171             }
172
173             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
174         }
175
176         throw new AssertionError(String.format("Expected specific message of type %s. Last message received was: %s",
177                 clazz, lastMessage), lastEx);
178     }
179
180     public static <T> void assertNoneMatching(final ActorRef actor, final Class<T> clazz) {
181         assertNoneMatching(actor, clazz, 5000);
182     }
183
184     @SuppressWarnings("checkstyle:IllegalCatch")
185     public static <T> void assertNoneMatching(final ActorRef actor, final Class<T> clazz, final long timeout) {
186         Exception lastEx = null;
187         int count = (int) (timeout / 50);
188         for (int i = 0; i < count; i++) {
189             try {
190                 T message = getFirstMatching(actor, clazz);
191                 if (message != null) {
192                     Assert.fail("Unexpected message received" +  message.toString());
193                     return;
194                 }
195
196                 lastEx = null;
197             } catch (Exception e) {
198                 lastEx = e;
199             }
200
201             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
202         }
203
204         if (lastEx != null) {
205             Throwables.throwIfUnchecked(lastEx);
206             throw new RuntimeException(lastEx);
207         }
208
209         return;
210     }
211
212
213     public static <T> List<T> getAllMatching(final ActorRef actor, final Class<T> clazz) {
214         List<Object> allMessages = getAllMessages(actor);
215
216         List<T> output = new ArrayList<>();
217
218         for (Object message : allMessages) {
219             if (message.getClass().equals(clazz)) {
220                 output.add(clazz.cast(message));
221             }
222         }
223
224         return output;
225     }
226
227     public static void waitUntilReady(final ActorRef actor) throws TimeoutException, InterruptedException {
228         long timeout = 500;
229         FiniteDuration duration = FiniteDuration.create(timeout, TimeUnit.MILLISECONDS);
230         for (int i = 0; i < 10; i++) {
231             try {
232                 Await.ready(Patterns.ask(actor, ARE_YOU_READY, timeout), duration);
233                 return;
234             } catch (TimeoutException e) {
235                 // will fall through below
236             }
237         }
238
239         throw new TimeoutException("Actor not ready in time.");
240     }
241
242     public static Props props() {
243         return Props.create(MessageCollectorActor.class);
244     }
245 }