BUG 3125 : Set Rate Limit just before acquiring a permit to avoid contention
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.junit.Assert.fail;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Futures;
19 import akka.japi.Creator;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Arrays;
29 import java.util.Map;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.mockito.Mockito;
34 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
35 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
36 import org.opendaylight.controller.cluster.datastore.Configuration;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
47 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.Await;
52 import scala.concurrent.Future;
53 import scala.concurrent.duration.Duration;
54 import scala.concurrent.duration.FiniteDuration;
55
56 public class ActorContextTest extends AbstractActorTest{
57
58     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
59
60     private static class TestMessage {
61     }
62
63     private static class MockShardManager extends UntypedActor {
64
65         private final boolean found;
66         private final ActorRef actorRef;
67         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
68
69         private MockShardManager(boolean found, ActorRef actorRef){
70
71             this.found = found;
72             this.actorRef = actorRef;
73         }
74
75         @Override public void onReceive(Object message) throws Exception {
76             if(message instanceof FindPrimary) {
77                 FindPrimary fp = (FindPrimary)message;
78                 Object resp = findPrimaryResponses.get(fp.getShardName());
79                 if(resp == null) {
80                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
81                 } else {
82                     getSender().tell(resp, getSelf());
83                 }
84
85                 return;
86             }
87
88             if(found){
89                 getSender().tell(new LocalShardFound(actorRef), getSelf());
90             } else {
91                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
92             }
93         }
94
95         void addFindPrimaryResp(String shardName, Object resp) {
96             findPrimaryResponses.put(shardName, resp);
97         }
98
99         private static Props props(final boolean found, final ActorRef actorRef){
100             return Props.create(new MockShardManagerCreator(found, actorRef) );
101         }
102
103         private static Props props(){
104             return Props.create(new MockShardManagerCreator() );
105         }
106
107         @SuppressWarnings("serial")
108         private static class MockShardManagerCreator implements Creator<MockShardManager> {
109             final boolean found;
110             final ActorRef actorRef;
111
112             MockShardManagerCreator() {
113                 this.found = false;
114                 this.actorRef = null;
115             }
116
117             MockShardManagerCreator(boolean found, ActorRef actorRef) {
118                 this.found = found;
119                 this.actorRef = actorRef;
120             }
121
122             @Override
123             public MockShardManager create() throws Exception {
124                 return new MockShardManager(found, actorRef);
125             }
126         }
127     }
128
129     @Test
130     public void testFindLocalShardWithShardFound(){
131         new JavaTestKit(getSystem()) {{
132
133             new Within(duration("1 seconds")) {
134                 @Override
135                 protected void run() {
136
137                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
138
139                     ActorRef shardManagerActorRef = getSystem()
140                         .actorOf(MockShardManager.props(true, shardActorRef));
141
142                     ActorContext actorContext =
143                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
144                             mock(Configuration.class));
145
146                     Optional<ActorRef> out = actorContext.findLocalShard("default");
147
148                     assertEquals(shardActorRef, out.get());
149
150
151                     expectNoMsg();
152                 }
153             };
154         }};
155
156     }
157
158     @Test
159     public void testFindLocalShardWithShardNotFound(){
160         new JavaTestKit(getSystem()) {{
161             ActorRef shardManagerActorRef = getSystem()
162                     .actorOf(MockShardManager.props(false, null));
163
164             ActorContext actorContext =
165                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
166                             mock(Configuration.class));
167
168             Optional<ActorRef> out = actorContext.findLocalShard("default");
169             assertTrue(!out.isPresent());
170         }};
171
172     }
173
174     @Test
175     public void testExecuteRemoteOperation() {
176         new JavaTestKit(getSystem()) {{
177             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
178
179             ActorRef shardManagerActorRef = getSystem()
180                     .actorOf(MockShardManager.props(true, shardActorRef));
181
182             ActorContext actorContext =
183                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
184                             mock(Configuration.class));
185
186             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
187
188             Object out = actorContext.executeOperation(actor, "hello");
189
190             assertEquals("hello", out);
191         }};
192     }
193
194     @Test
195     public void testExecuteRemoteOperationAsync() {
196         new JavaTestKit(getSystem()) {{
197             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
198
199             ActorRef shardManagerActorRef = getSystem()
200                     .actorOf(MockShardManager.props(true, shardActorRef));
201
202             ActorContext actorContext =
203                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
204                             mock(Configuration.class));
205
206             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
207
208             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
209
210             try {
211                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
212                 assertEquals("Result", "hello", result);
213             } catch(Exception e) {
214                 throw new AssertionError(e);
215             }
216         }};
217     }
218
219     @Test
220     public void testIsPathLocal() {
221         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
222         ActorContext actorContext = null;
223
224         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
225         assertEquals(false, actorContext.isPathLocal(null));
226         assertEquals(false, actorContext.isPathLocal(""));
227
228         clusterWrapper.setSelfAddress(null);
229         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
230         assertEquals(false, actorContext.isPathLocal(""));
231
232         // even if the path is in local format, match the primary path (first 3 elements) and return true
233         clusterWrapper.setSelfAddress(new Address("akka", "test"));
234         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
235         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
236
237         clusterWrapper.setSelfAddress(new Address("akka", "test"));
238         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
239         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
240
241         clusterWrapper.setSelfAddress(new Address("akka", "test"));
242         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
243         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
244
245         // self address of remote format,but Tx path local format.
246         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
247         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
248         assertEquals(true, actorContext.isPathLocal(
249             "akka://system/user/shardmanager/shard/transaction"));
250
251         // self address of local format,but Tx path remote format.
252         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
253         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
254         assertEquals(false, actorContext.isPathLocal(
255             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
256
257         //local path but not same
258         clusterWrapper.setSelfAddress(new Address("akka", "test"));
259         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
260         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
261
262         //ip and port same
263         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
264         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
265         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
266
267         // forward-slash missing in address
268         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
269         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
270         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
271
272         //ips differ
273         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
274         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
275         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
276
277         //ports differ
278         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
279         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
280         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
281     }
282
283     @Test
284     public void testResolvePathForRemoteActor() {
285         ActorContext actorContext =
286                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
287                         ClusterWrapper.class),
288                         mock(Configuration.class));
289
290         String actual = actorContext.resolvePath(
291                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
292                 "akka://system/user/shardmanager/shard/transaction");
293
294         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
295
296         assertEquals(expected, actual);
297     }
298
299     @Test
300     public void testResolvePathForLocalActor() {
301         ActorContext actorContext =
302                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
303                         mock(Configuration.class));
304
305         String actual = actorContext.resolvePath(
306                 "akka://system/user/shardmanager/shard",
307                 "akka://system/user/shardmanager/shard/transaction");
308
309         String expected = "akka://system/user/shardmanager/shard/transaction";
310
311         assertEquals(expected, actual);
312     }
313
314     @Test
315     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
316         ActorContext actorContext =
317                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
318                         mock(Configuration.class));
319
320         String actual = actorContext.resolvePath(
321                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
322                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
323
324         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
325
326         assertEquals(expected, actual);
327     }
328
329
330     @Test
331     public void testClientDispatcherIsGlobalDispatcher(){
332         ActorContext actorContext =
333                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
334                         mock(Configuration.class), DatastoreContext.newBuilder().build());
335
336         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
337
338     }
339
340     @Test
341     public void testClientDispatcherIsNotGlobalDispatcher(){
342         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
343
344         ActorContext actorContext =
345                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
346                         mock(Configuration.class), DatastoreContext.newBuilder().build());
347
348         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
349
350         actorSystem.shutdown();
351
352     }
353
354     @Test
355     public void testSetDatastoreContext() {
356         new JavaTestKit(getSystem()) {{
357             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
358                             mock(Configuration.class), DatastoreContext.newBuilder().
359                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
360
361             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
362             assertEquals("getTransactionCommitOperationTimeout", 7,
363                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
364
365             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
366                     shardTransactionCommitTimeoutInSeconds(8).build();
367
368             actorContext.setDatastoreContext(newContext);
369
370             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
371
372             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
373
374             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
375             assertEquals("getTransactionCommitOperationTimeout", 8,
376                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
377         }};
378     }
379
380     @Test
381     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
382
383             TestActorRef<MessageCollectorActor> shardManager =
384                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
385
386             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
387                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
388
389             final String expPrimaryPath = "akka://test-system/find-primary-shard";
390             ActorContext actorContext =
391                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
392                             mock(Configuration.class), dataStoreContext) {
393                         @Override
394                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
395                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
396                         }
397                     };
398
399             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
400             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
401
402             assertNotNull(actual);
403             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
404             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
405                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
406
407             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
408
409             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
410
411             assertEquals(cachedInfo, actual);
412
413             // Wait for 200 Milliseconds. The cached entry should have been removed.
414
415             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
416
417             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
418
419             assertNull(cached);
420     }
421
422     @Test
423     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
424
425             TestActorRef<MessageCollectorActor> shardManager =
426                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
427
428             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
429                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
430
431             final DataTree mockDataTree = Mockito.mock(DataTree.class);
432             final String expPrimaryPath = "akka://test-system/find-primary-shard";
433             ActorContext actorContext =
434                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
435                             mock(Configuration.class), dataStoreContext) {
436                         @Override
437                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
438                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
439                         }
440                     };
441
442             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
443             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
444
445             assertNotNull(actual);
446             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
447             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
448             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
449                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
450
451             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
452
453             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
454
455             assertEquals(cachedInfo, actual);
456
457             // Wait for 200 Milliseconds. The cached entry should have been removed.
458
459             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
460
461             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
462
463             assertNull(cached);
464     }
465
466     @Test
467     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
468
469             TestActorRef<MessageCollectorActor> shardManager =
470                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
471
472             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
473                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
474
475             ActorContext actorContext =
476                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
477                             mock(Configuration.class), dataStoreContext) {
478                         @Override
479                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
480                             return Futures.successful((Object) new PrimaryNotFoundException("not found"));
481                         }
482                     };
483
484
485             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
486
487             try {
488                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
489                 fail("Expected PrimaryNotFoundException");
490             } catch(PrimaryNotFoundException e){
491
492             }
493
494             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
495
496             assertNull(cached);
497     }
498
499     @Test
500     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
501
502             TestActorRef<MessageCollectorActor> shardManager =
503                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
504
505             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
506                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
507
508             ActorContext actorContext =
509                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
510                             mock(Configuration.class), dataStoreContext) {
511                         @Override
512                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
513                             return Futures.successful((Object) new NotInitializedException("not iniislized"));
514                         }
515                     };
516
517
518             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
519
520             try {
521                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
522                 fail("Expected NotInitializedException");
523             } catch(NotInitializedException e){
524
525             }
526
527             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
528
529             assertNull(cached);
530     }
531
532     @Test
533     public void testBroadcast() {
534         new JavaTestKit(getSystem()) {{
535             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
536             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
537
538             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
539             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
540             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
541             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
542             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
543
544             Configuration mockConfig = mock(Configuration.class);
545             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
546                     when(mockConfig).getAllShardNames();
547
548             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
549                     mock(ClusterWrapper.class), mockConfig,
550                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
551
552             actorContext.broadcast(new TestMessage());
553
554             expectFirstMatching(shardActorRef1, TestMessage.class);
555             expectFirstMatching(shardActorRef2, TestMessage.class);
556         }};
557     }
558
559     private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
560         int count = 5000 / 50;
561         for(int i = 0; i < count; i++) {
562             try {
563                 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
564                 if(message != null) {
565                     return message;
566                 }
567             } catch (Exception e) {}
568
569             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
570         }
571
572         Assert.fail("Did not receive message of type " + clazz);
573         return null;
574     }
575 }