Bug 3194: Dynamically update PrimaryShardInfo cache when leader changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.junit.Assert.fail;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Futures;
19 import akka.japi.Creator;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Arrays;
29 import java.util.Map;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.mockito.Mockito;
34 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
35 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
36 import org.opendaylight.controller.cluster.datastore.Configuration;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
47 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.Await;
52 import scala.concurrent.Future;
53 import scala.concurrent.duration.Duration;
54 import scala.concurrent.duration.FiniteDuration;
55
56 public class ActorContextTest extends AbstractActorTest{
57
58     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
59
60     private static class TestMessage {
61     }
62
63     private static class MockShardManager extends UntypedActor {
64
65         private final boolean found;
66         private final ActorRef actorRef;
67         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
68
69         private MockShardManager(boolean found, ActorRef actorRef){
70
71             this.found = found;
72             this.actorRef = actorRef;
73         }
74
75         @Override public void onReceive(Object message) throws Exception {
76             if(message instanceof FindPrimary) {
77                 FindPrimary fp = (FindPrimary)message;
78                 Object resp = findPrimaryResponses.get(fp.getShardName());
79                 if(resp == null) {
80                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
81                 } else {
82                     getSender().tell(resp, getSelf());
83                 }
84
85                 return;
86             }
87
88             if(found){
89                 getSender().tell(new LocalShardFound(actorRef), getSelf());
90             } else {
91                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
92             }
93         }
94
95         void addFindPrimaryResp(String shardName, Object resp) {
96             findPrimaryResponses.put(shardName, resp);
97         }
98
99         private static Props props(final boolean found, final ActorRef actorRef){
100             return Props.create(new MockShardManagerCreator(found, actorRef) );
101         }
102
103         private static Props props(){
104             return Props.create(new MockShardManagerCreator() );
105         }
106
107         @SuppressWarnings("serial")
108         private static class MockShardManagerCreator implements Creator<MockShardManager> {
109             final boolean found;
110             final ActorRef actorRef;
111
112             MockShardManagerCreator() {
113                 this.found = false;
114                 this.actorRef = null;
115             }
116
117             MockShardManagerCreator(boolean found, ActorRef actorRef) {
118                 this.found = found;
119                 this.actorRef = actorRef;
120             }
121
122             @Override
123             public MockShardManager create() throws Exception {
124                 return new MockShardManager(found, actorRef);
125             }
126         }
127     }
128
129     @Test
130     public void testFindLocalShardWithShardFound(){
131         new JavaTestKit(getSystem()) {{
132
133             new Within(duration("1 seconds")) {
134                 @Override
135                 protected void run() {
136
137                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
138
139                     ActorRef shardManagerActorRef = getSystem()
140                         .actorOf(MockShardManager.props(true, shardActorRef));
141
142                     ActorContext actorContext =
143                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
144                             mock(Configuration.class));
145
146                     Optional<ActorRef> out = actorContext.findLocalShard("default");
147
148                     assertEquals(shardActorRef, out.get());
149
150
151                     expectNoMsg();
152                 }
153             };
154         }};
155
156     }
157
158     @Test
159     public void testFindLocalShardWithShardNotFound(){
160         new JavaTestKit(getSystem()) {{
161             ActorRef shardManagerActorRef = getSystem()
162                     .actorOf(MockShardManager.props(false, null));
163
164             ActorContext actorContext =
165                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
166                             mock(Configuration.class));
167
168             Optional<ActorRef> out = actorContext.findLocalShard("default");
169             assertTrue(!out.isPresent());
170         }};
171
172     }
173
174     @Test
175     public void testExecuteRemoteOperation() {
176         new JavaTestKit(getSystem()) {{
177             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
178
179             ActorRef shardManagerActorRef = getSystem()
180                     .actorOf(MockShardManager.props(true, shardActorRef));
181
182             ActorContext actorContext =
183                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
184                             mock(Configuration.class));
185
186             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
187
188             Object out = actorContext.executeOperation(actor, "hello");
189
190             assertEquals("hello", out);
191         }};
192     }
193
194     @Test
195     public void testExecuteRemoteOperationAsync() {
196         new JavaTestKit(getSystem()) {{
197             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
198
199             ActorRef shardManagerActorRef = getSystem()
200                     .actorOf(MockShardManager.props(true, shardActorRef));
201
202             ActorContext actorContext =
203                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
204                             mock(Configuration.class));
205
206             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
207
208             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
209
210             try {
211                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
212                 assertEquals("Result", "hello", result);
213             } catch(Exception e) {
214                 throw new AssertionError(e);
215             }
216         }};
217     }
218
219     @Test
220     public void testIsPathLocal() {
221         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
222         ActorContext actorContext = null;
223
224         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
225         assertEquals(false, actorContext.isPathLocal(null));
226         assertEquals(false, actorContext.isPathLocal(""));
227
228         clusterWrapper.setSelfAddress(null);
229         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
230         assertEquals(false, actorContext.isPathLocal(""));
231
232         // even if the path is in local format, match the primary path (first 3 elements) and return true
233         clusterWrapper.setSelfAddress(new Address("akka", "test"));
234         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
235         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
236
237         clusterWrapper.setSelfAddress(new Address("akka", "test"));
238         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
239         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
240
241         clusterWrapper.setSelfAddress(new Address("akka", "test"));
242         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
243         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
244
245         // self address of remote format,but Tx path local format.
246         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
247         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
248         assertEquals(true, actorContext.isPathLocal(
249             "akka://system/user/shardmanager/shard/transaction"));
250
251         // self address of local format,but Tx path remote format.
252         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
253         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
254         assertEquals(false, actorContext.isPathLocal(
255             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
256
257         //local path but not same
258         clusterWrapper.setSelfAddress(new Address("akka", "test"));
259         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
260         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
261
262         //ip and port same
263         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
264         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
265         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
266
267         // forward-slash missing in address
268         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
269         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
270         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
271
272         //ips differ
273         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
274         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
275         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
276
277         //ports differ
278         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
279         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
280         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
281     }
282
283     @Test
284     public void testResolvePathForRemoteActor() {
285         ActorContext actorContext =
286                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
287                         ClusterWrapper.class),
288                         mock(Configuration.class));
289
290         String actual = actorContext.resolvePath(
291                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
292                 "akka://system/user/shardmanager/shard/transaction");
293
294         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
295
296         assertEquals(expected, actual);
297     }
298
299     @Test
300     public void testResolvePathForLocalActor() {
301         ActorContext actorContext =
302                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
303                         mock(Configuration.class));
304
305         String actual = actorContext.resolvePath(
306                 "akka://system/user/shardmanager/shard",
307                 "akka://system/user/shardmanager/shard/transaction");
308
309         String expected = "akka://system/user/shardmanager/shard/transaction";
310
311         assertEquals(expected, actual);
312     }
313
314     @Test
315     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
316         ActorContext actorContext =
317                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
318                         mock(Configuration.class));
319
320         String actual = actorContext.resolvePath(
321                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
322                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
323
324         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
325
326         assertEquals(expected, actual);
327     }
328
329
330     @Test
331     public void testClientDispatcherIsGlobalDispatcher(){
332         ActorContext actorContext =
333                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
334                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
335
336         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
337
338     }
339
340     @Test
341     public void testClientDispatcherIsNotGlobalDispatcher(){
342         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
343
344         ActorContext actorContext =
345                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
346                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
347
348         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
349
350         actorSystem.shutdown();
351
352     }
353
354     @Test
355     public void testSetDatastoreContext() {
356         new JavaTestKit(getSystem()) {{
357             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
358                             mock(Configuration.class), DatastoreContext.newBuilder().
359                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
360
361             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
362             assertEquals("getTransactionCommitOperationTimeout", 7,
363                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
364
365             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
366                     shardTransactionCommitTimeoutInSeconds(8).build();
367
368             actorContext.setDatastoreContext(newContext);
369
370             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
371
372             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
373
374             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
375             assertEquals("getTransactionCommitOperationTimeout", 8,
376                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
377         }};
378     }
379
380     @Test
381     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
382
383             TestActorRef<MessageCollectorActor> shardManager =
384                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
385
386             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
387                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
388
389             final String expPrimaryPath = "akka://test-system/find-primary-shard";
390             ActorContext actorContext =
391                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
392                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
393                         @Override
394                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
395                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
396                         }
397                     };
398
399             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
400             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
401
402             assertNotNull(actual);
403             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
404             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
405                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
406
407             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
408
409             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
410
411             assertEquals(cachedInfo, actual);
412
413             actorContext.getPrimaryShardInfoCache().remove("foobar");
414
415             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
416
417             assertNull(cached);
418     }
419
420     @Test
421     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
422
423             TestActorRef<MessageCollectorActor> shardManager =
424                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
425
426             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
427                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
428
429             final DataTree mockDataTree = Mockito.mock(DataTree.class);
430             final String expPrimaryPath = "akka://test-system/find-primary-shard";
431             ActorContext actorContext =
432                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
433                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
434                         @Override
435                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
436                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
437                         }
438                     };
439
440             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
441             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
442
443             assertNotNull(actual);
444             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
445             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
446             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
447                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
448
449             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
450
451             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
452
453             assertEquals(cachedInfo, actual);
454
455             actorContext.getPrimaryShardInfoCache().remove("foobar");
456
457             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
458
459             assertNull(cached);
460     }
461
462     @Test
463     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
464
465             TestActorRef<MessageCollectorActor> shardManager =
466                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
467
468             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
469                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
470
471             ActorContext actorContext =
472                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
473                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
474                         @Override
475                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
476                             return Futures.successful((Object) new PrimaryNotFoundException("not found"));
477                         }
478                     };
479
480
481             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
482
483             try {
484                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
485                 fail("Expected PrimaryNotFoundException");
486             } catch(PrimaryNotFoundException e){
487
488             }
489
490             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
491
492             assertNull(cached);
493     }
494
495     @Test
496     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
497
498             TestActorRef<MessageCollectorActor> shardManager =
499                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
500
501             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
502                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
503
504             ActorContext actorContext =
505                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
506                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
507                         @Override
508                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
509                             return Futures.successful((Object) new NotInitializedException("not iniislized"));
510                         }
511                     };
512
513
514             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
515
516             try {
517                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
518                 fail("Expected NotInitializedException");
519             } catch(NotInitializedException e){
520
521             }
522
523             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
524
525             assertNull(cached);
526     }
527
528     @Test
529     public void testBroadcast() {
530         new JavaTestKit(getSystem()) {{
531             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
532             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
533
534             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
535             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
536             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
537             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
538             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
539
540             Configuration mockConfig = mock(Configuration.class);
541             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
542                     when(mockConfig).getAllShardNames();
543
544             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
545                     mock(ClusterWrapper.class), mockConfig,
546                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
547
548             actorContext.broadcast(new TestMessage());
549
550             expectFirstMatching(shardActorRef1, TestMessage.class);
551             expectFirstMatching(shardActorRef2, TestMessage.class);
552         }};
553     }
554
555     private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
556         int count = 5000 / 50;
557         for(int i = 0; i < count; i++) {
558             try {
559                 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
560                 if(message != null) {
561                     return message;
562                 }
563             } catch (Exception e) {}
564
565             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
566         }
567
568         Assert.fail("Did not receive message of type " + clazz);
569         return null;
570     }
571 }