efe73d38ad526e9a4c312949c79e0d5d64b7c7f8
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.google.common.util.concurrent.Uninterruptibles;
35 import com.typesafe.config.ConfigFactory;
36 import java.util.Arrays;
37 import java.util.Map;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.mockito.Mockito;
42 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
57 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Await;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.Duration;
63 import scala.concurrent.duration.FiniteDuration;
64
65 public class ActorContextTest extends AbstractActorTest{
66
67     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
68
69     private static class TestMessage {
70     }
71
72     private static class MockShardManager extends UntypedActor {
73
74         private final boolean found;
75         private final ActorRef actorRef;
76         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
77
78         private MockShardManager(boolean found, ActorRef actorRef){
79
80             this.found = found;
81             this.actorRef = actorRef;
82         }
83
84         @Override public void onReceive(Object message) throws Exception {
85             if(message instanceof FindPrimary) {
86                 FindPrimary fp = (FindPrimary)message;
87                 Object resp = findPrimaryResponses.get(fp.getShardName());
88                 if(resp == null) {
89                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
90                 } else {
91                     getSender().tell(resp, getSelf());
92                 }
93
94                 return;
95             }
96
97             if(found){
98                 getSender().tell(new LocalShardFound(actorRef), getSelf());
99             } else {
100                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
101             }
102         }
103
104         void addFindPrimaryResp(String shardName, Object resp) {
105             findPrimaryResponses.put(shardName, resp);
106         }
107
108         private static Props props(final boolean found, final ActorRef actorRef){
109             return Props.create(new MockShardManagerCreator(found, actorRef) );
110         }
111
112         private static Props props(){
113             return Props.create(new MockShardManagerCreator() );
114         }
115
116         @SuppressWarnings("serial")
117         private static class MockShardManagerCreator implements Creator<MockShardManager> {
118             final boolean found;
119             final ActorRef actorRef;
120
121             MockShardManagerCreator() {
122                 this.found = false;
123                 this.actorRef = null;
124             }
125
126             MockShardManagerCreator(boolean found, ActorRef actorRef) {
127                 this.found = found;
128                 this.actorRef = actorRef;
129             }
130
131             @Override
132             public MockShardManager create() throws Exception {
133                 return new MockShardManager(found, actorRef);
134             }
135         }
136     }
137
138     @Test
139     public void testFindLocalShardWithShardFound(){
140         new JavaTestKit(getSystem()) {{
141
142             new Within(duration("1 seconds")) {
143                 @Override
144                 protected void run() {
145
146                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
147
148                     ActorRef shardManagerActorRef = getSystem()
149                         .actorOf(MockShardManager.props(true, shardActorRef));
150
151                     ActorContext actorContext =
152                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
153                             mock(Configuration.class));
154
155                     Optional<ActorRef> out = actorContext.findLocalShard("default");
156
157                     assertEquals(shardActorRef, out.get());
158
159
160                     expectNoMsg();
161                 }
162             };
163         }};
164
165     }
166
167     @Test
168     public void testFindLocalShardWithShardNotFound(){
169         new JavaTestKit(getSystem()) {{
170             ActorRef shardManagerActorRef = getSystem()
171                     .actorOf(MockShardManager.props(false, null));
172
173             ActorContext actorContext =
174                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
175                             mock(Configuration.class));
176
177             Optional<ActorRef> out = actorContext.findLocalShard("default");
178             assertTrue(!out.isPresent());
179         }};
180
181     }
182
183     @Test
184     public void testExecuteRemoteOperation() {
185         new JavaTestKit(getSystem()) {{
186             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
187
188             ActorRef shardManagerActorRef = getSystem()
189                     .actorOf(MockShardManager.props(true, shardActorRef));
190
191             ActorContext actorContext =
192                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
193                             mock(Configuration.class));
194
195             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
196
197             Object out = actorContext.executeOperation(actor, "hello");
198
199             assertEquals("hello", out);
200         }};
201     }
202
203     @Test
204     public void testExecuteRemoteOperationAsync() {
205         new JavaTestKit(getSystem()) {{
206             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
207
208             ActorRef shardManagerActorRef = getSystem()
209                     .actorOf(MockShardManager.props(true, shardActorRef));
210
211             ActorContext actorContext =
212                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
213                             mock(Configuration.class));
214
215             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
216
217             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
218
219             try {
220                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
221                 assertEquals("Result", "hello", result);
222             } catch(Exception e) {
223                 throw new AssertionError(e);
224             }
225         }};
226     }
227
228     @Test
229     public void testIsPathLocal() {
230         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
231         ActorContext actorContext = null;
232
233         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
234         assertEquals(false, actorContext.isPathLocal(null));
235         assertEquals(false, actorContext.isPathLocal(""));
236
237         clusterWrapper.setSelfAddress(null);
238         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
239         assertEquals(false, actorContext.isPathLocal(""));
240
241         // even if the path is in local format, match the primary path (first 3 elements) and return true
242         clusterWrapper.setSelfAddress(new Address("akka", "test"));
243         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
244         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
245
246         clusterWrapper.setSelfAddress(new Address("akka", "test"));
247         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
248         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
249
250         clusterWrapper.setSelfAddress(new Address("akka", "test"));
251         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
252         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
253
254         // self address of remote format,but Tx path local format.
255         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
256         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
257         assertEquals(true, actorContext.isPathLocal(
258             "akka://system/user/shardmanager/shard/transaction"));
259
260         // self address of local format,but Tx path remote format.
261         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
262         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
263         assertEquals(false, actorContext.isPathLocal(
264             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
265
266         //local path but not same
267         clusterWrapper.setSelfAddress(new Address("akka", "test"));
268         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
269         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
270
271         //ip and port same
272         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
273         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
274         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
275
276         // forward-slash missing in address
277         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
278         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
279         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
280
281         //ips differ
282         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
283         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
284         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
285
286         //ports differ
287         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
288         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
289         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
290     }
291
292     @Test
293     public void testResolvePathForRemoteActor() {
294         ActorContext actorContext =
295                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
296                         ClusterWrapper.class),
297                         mock(Configuration.class));
298
299         String actual = actorContext.resolvePath(
300                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
301                 "akka://system/user/shardmanager/shard/transaction");
302
303         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
304
305         assertEquals(expected, actual);
306     }
307
308     @Test
309     public void testResolvePathForLocalActor() {
310         ActorContext actorContext =
311                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
312                         mock(Configuration.class));
313
314         String actual = actorContext.resolvePath(
315                 "akka://system/user/shardmanager/shard",
316                 "akka://system/user/shardmanager/shard/transaction");
317
318         String expected = "akka://system/user/shardmanager/shard/transaction";
319
320         assertEquals(expected, actual);
321     }
322
323     @Test
324     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
325         ActorContext actorContext =
326                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
327                         mock(Configuration.class));
328
329         String actual = actorContext.resolvePath(
330                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
331                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
332
333         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
334
335         assertEquals(expected, actual);
336     }
337
338
339     @Test
340     public void testClientDispatcherIsGlobalDispatcher(){
341         ActorContext actorContext =
342                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
343                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
344
345         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
346
347     }
348
349     @Test
350     public void testClientDispatcherIsNotGlobalDispatcher(){
351         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
352
353         ActorContext actorContext =
354                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
355                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
356
357         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
358
359         actorSystem.shutdown();
360
361     }
362
363     @Test
364     public void testSetDatastoreContext() {
365         new JavaTestKit(getSystem()) {{
366             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
367                             mock(Configuration.class), DatastoreContext.newBuilder().
368                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
369
370             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
371             assertEquals("getTransactionCommitOperationTimeout", 7,
372                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
373
374             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
375                     shardTransactionCommitTimeoutInSeconds(8).build();
376
377             actorContext.setDatastoreContext(newContext);
378
379             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
380
381             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
382
383             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
384             assertEquals("getTransactionCommitOperationTimeout", 8,
385                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
386         }};
387     }
388
389     @Test
390     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
391
392             TestActorRef<MessageCollectorActor> shardManager =
393                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
394
395             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
396                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
397
398             final String expPrimaryPath = "akka://test-system/find-primary-shard";
399             final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
400             ActorContext actorContext =
401                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
402                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
403                         @Override
404                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
405                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
406                         }
407                     };
408
409             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
410             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
411
412             assertNotNull(actual);
413             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
414             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
415                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
416             assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
417
418             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
419
420             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
421
422             assertEquals(cachedInfo, actual);
423
424             actorContext.getPrimaryShardInfoCache().remove("foobar");
425
426             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
427
428             assertNull(cached);
429     }
430
431     @Test
432     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
433
434             TestActorRef<MessageCollectorActor> shardManager =
435                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
436
437             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
438                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
439
440             final DataTree mockDataTree = Mockito.mock(DataTree.class);
441             final String expPrimaryPath = "akka://test-system/find-primary-shard";
442             ActorContext actorContext =
443                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
444                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
445                         @Override
446                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
447                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
448                         }
449                     };
450
451             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
452             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
453
454             assertNotNull(actual);
455             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
456             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
457             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
458                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
459             assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
460
461             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
462
463             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
464
465             assertEquals(cachedInfo, actual);
466
467             actorContext.getPrimaryShardInfoCache().remove("foobar");
468
469             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
470
471             assertNull(cached);
472     }
473
474     @Test
475     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
476         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
477     }
478
479     @Test
480     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
481         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
482     }
483
484     private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
485         TestActorRef<MessageCollectorActor> shardManager =
486             TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
487
488         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
489             shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
490
491         ActorContext actorContext =
492             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
493                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
494                 @Override
495                 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
496                     return Futures.successful(expectedException);
497                 }
498             };
499
500         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
501
502         try {
503             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
504             fail("Expected" + expectedException.getClass().toString());
505         } catch(Exception e){
506             if(!expectedException.getClass().isInstance(e)) {
507                 fail("Expected Exception of type " + expectedException.getClass().toString());
508             }
509         }
510
511         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
512
513         assertNull(cached);
514     }
515
516     @Test
517     public void testBroadcast() {
518         new JavaTestKit(getSystem()) {{
519             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
520             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
521
522             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
523             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
524             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
525                     DataStoreVersions.CURRENT_VERSION));
526             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
527                     DataStoreVersions.CURRENT_VERSION));
528             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
529
530             Configuration mockConfig = mock(Configuration.class);
531             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
532                     when(mockConfig).getAllShardNames();
533
534             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
535                     mock(ClusterWrapper.class), mockConfig,
536                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
537
538             actorContext.broadcast(new TestMessage());
539
540             expectFirstMatching(shardActorRef1, TestMessage.class);
541             expectFirstMatching(shardActorRef2, TestMessage.class);
542         }};
543     }
544
545     private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
546         int count = 5000 / 50;
547         for(int i = 0; i < count; i++) {
548             try {
549                 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
550                 if(message != null) {
551                     return message;
552                 }
553             } catch (Exception e) {}
554
555             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
556         }
557
558         Assert.fail("Did not receive message of type " + clazz);
559         return null;
560     }
561 }