7e484ebde3d79bcadbb182162c55c23c56675d78
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.google.common.util.concurrent.Uninterruptibles;
35 import com.typesafe.config.ConfigFactory;
36 import java.util.Arrays;
37 import java.util.Map;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.mockito.Mockito;
42 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
47 import org.opendaylight.controller.cluster.datastore.config.Configuration;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
52 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
56 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
57 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
58 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61 import scala.concurrent.Await;
62 import scala.concurrent.Future;
63 import scala.concurrent.duration.Duration;
64 import scala.concurrent.duration.FiniteDuration;
65
66 public class ActorContextTest extends AbstractActorTest{
67
68     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
69
70     private static class TestMessage {
71     }
72
73     private static class MockShardManager extends UntypedActor {
74
75         private final boolean found;
76         private final ActorRef actorRef;
77         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
78
79         private MockShardManager(boolean found, ActorRef actorRef){
80
81             this.found = found;
82             this.actorRef = actorRef;
83         }
84
85         @Override public void onReceive(Object message) throws Exception {
86             if(message instanceof FindPrimary) {
87                 FindPrimary fp = (FindPrimary)message;
88                 Object resp = findPrimaryResponses.get(fp.getShardName());
89                 if(resp == null) {
90                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
91                 } else {
92                     getSender().tell(resp, getSelf());
93                 }
94
95                 return;
96             }
97
98             if(found){
99                 getSender().tell(new LocalShardFound(actorRef), getSelf());
100             } else {
101                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
102             }
103         }
104
105         void addFindPrimaryResp(String shardName, Object resp) {
106             findPrimaryResponses.put(shardName, resp);
107         }
108
109         private static Props props(final boolean found, final ActorRef actorRef){
110             return Props.create(new MockShardManagerCreator(found, actorRef) );
111         }
112
113         private static Props props(){
114             return Props.create(new MockShardManagerCreator() );
115         }
116
117         @SuppressWarnings("serial")
118         private static class MockShardManagerCreator implements Creator<MockShardManager> {
119             final boolean found;
120             final ActorRef actorRef;
121
122             MockShardManagerCreator() {
123                 this.found = false;
124                 this.actorRef = null;
125             }
126
127             MockShardManagerCreator(boolean found, ActorRef actorRef) {
128                 this.found = found;
129                 this.actorRef = actorRef;
130             }
131
132             @Override
133             public MockShardManager create() throws Exception {
134                 return new MockShardManager(found, actorRef);
135             }
136         }
137     }
138
139     @Test
140     public void testFindLocalShardWithShardFound(){
141         new JavaTestKit(getSystem()) {{
142
143             new Within(duration("1 seconds")) {
144                 @Override
145                 protected void run() {
146
147                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
148
149                     ActorRef shardManagerActorRef = getSystem()
150                         .actorOf(MockShardManager.props(true, shardActorRef));
151
152                     ActorContext actorContext =
153                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
154                             mock(Configuration.class));
155
156                     Optional<ActorRef> out = actorContext.findLocalShard("default");
157
158                     assertEquals(shardActorRef, out.get());
159
160
161                     expectNoMsg();
162                 }
163             };
164         }};
165
166     }
167
168     @Test
169     public void testFindLocalShardWithShardNotFound(){
170         new JavaTestKit(getSystem()) {{
171             ActorRef shardManagerActorRef = getSystem()
172                     .actorOf(MockShardManager.props(false, null));
173
174             ActorContext actorContext =
175                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
176                             mock(Configuration.class));
177
178             Optional<ActorRef> out = actorContext.findLocalShard("default");
179             assertTrue(!out.isPresent());
180         }};
181
182     }
183
184     @Test
185     public void testExecuteRemoteOperation() {
186         new JavaTestKit(getSystem()) {{
187             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
188
189             ActorRef shardManagerActorRef = getSystem()
190                     .actorOf(MockShardManager.props(true, shardActorRef));
191
192             ActorContext actorContext =
193                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
194                             mock(Configuration.class));
195
196             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
197
198             Object out = actorContext.executeOperation(actor, "hello");
199
200             assertEquals("hello", out);
201         }};
202     }
203
204     @Test
205     public void testExecuteRemoteOperationAsync() {
206         new JavaTestKit(getSystem()) {{
207             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
208
209             ActorRef shardManagerActorRef = getSystem()
210                     .actorOf(MockShardManager.props(true, shardActorRef));
211
212             ActorContext actorContext =
213                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
214                             mock(Configuration.class));
215
216             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
217
218             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
219
220             try {
221                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
222                 assertEquals("Result", "hello", result);
223             } catch(Exception e) {
224                 throw new AssertionError(e);
225             }
226         }};
227     }
228
229     @Test
230     public void testIsPathLocal() {
231         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
232         ActorContext actorContext = null;
233
234         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
235         assertEquals(false, actorContext.isPathLocal(null));
236         assertEquals(false, actorContext.isPathLocal(""));
237
238         clusterWrapper.setSelfAddress(null);
239         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
240         assertEquals(false, actorContext.isPathLocal(""));
241
242         // even if the path is in local format, match the primary path (first 3 elements) and return true
243         clusterWrapper.setSelfAddress(new Address("akka", "test"));
244         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
245         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
246
247         clusterWrapper.setSelfAddress(new Address("akka", "test"));
248         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
249         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
250
251         clusterWrapper.setSelfAddress(new Address("akka", "test"));
252         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
253         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
254
255         // self address of remote format,but Tx path local format.
256         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
257         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
258         assertEquals(true, actorContext.isPathLocal(
259             "akka://system/user/shardmanager/shard/transaction"));
260
261         // self address of local format,but Tx path remote format.
262         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
263         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
264         assertEquals(false, actorContext.isPathLocal(
265             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
266
267         //local path but not same
268         clusterWrapper.setSelfAddress(new Address("akka", "test"));
269         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
270         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
271
272         //ip and port same
273         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
274         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
275         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
276
277         // forward-slash missing in address
278         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
279         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
280         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
281
282         //ips differ
283         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
284         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
285         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
286
287         //ports differ
288         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
289         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
290         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
291     }
292
293     @Test
294     public void testResolvePathForRemoteActor() {
295         ActorContext actorContext =
296                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
297                         ClusterWrapper.class),
298                         mock(Configuration.class));
299
300         String actual = actorContext.resolvePath(
301                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
302                 "akka://system/user/shardmanager/shard/transaction");
303
304         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
305
306         assertEquals(expected, actual);
307     }
308
309     @Test
310     public void testResolvePathForLocalActor() {
311         ActorContext actorContext =
312                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
313                         mock(Configuration.class));
314
315         String actual = actorContext.resolvePath(
316                 "akka://system/user/shardmanager/shard",
317                 "akka://system/user/shardmanager/shard/transaction");
318
319         String expected = "akka://system/user/shardmanager/shard/transaction";
320
321         assertEquals(expected, actual);
322     }
323
324     @Test
325     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
326         ActorContext actorContext =
327                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
328                         mock(Configuration.class));
329
330         String actual = actorContext.resolvePath(
331                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
332                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
333
334         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
335
336         assertEquals(expected, actual);
337     }
338
339
340     @Test
341     public void testClientDispatcherIsGlobalDispatcher(){
342         ActorContext actorContext =
343                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
344                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
345
346         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
347
348     }
349
350     @Test
351     public void testClientDispatcherIsNotGlobalDispatcher(){
352         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
353
354         ActorContext actorContext =
355                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
356                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
357
358         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
359
360         actorSystem.shutdown();
361
362     }
363
364     @Test
365     public void testSetDatastoreContext() {
366         new JavaTestKit(getSystem()) {{
367             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
368                             mock(Configuration.class), DatastoreContext.newBuilder().
369                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
370
371             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
372             assertEquals("getTransactionCommitOperationTimeout", 7,
373                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
374
375             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
376                     shardTransactionCommitTimeoutInSeconds(8).build();
377
378             DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
379             Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
380
381             actorContext.setDatastoreContext(mockContextFactory);
382
383             expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
384
385             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
386
387             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
388             assertEquals("getTransactionCommitOperationTimeout", 8,
389                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
390         }};
391     }
392
393     @Test
394     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
395
396             TestActorRef<MessageCollectorActor> shardManager =
397                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
398
399             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
400                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
401
402             final String expPrimaryPath = "akka://test-system/find-primary-shard";
403             final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
404             ActorContext actorContext =
405                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
406                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
407                         @Override
408                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
409                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
410                         }
411                     };
412
413             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
414             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
415
416             assertNotNull(actual);
417             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
418             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
419                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
420             assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
421
422             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
423
424             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
425
426             assertEquals(cachedInfo, actual);
427
428             actorContext.getPrimaryShardInfoCache().remove("foobar");
429
430             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
431
432             assertNull(cached);
433     }
434
435     @Test
436     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
437
438             TestActorRef<MessageCollectorActor> shardManager =
439                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
440
441             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
442                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
443
444             final DataTree mockDataTree = Mockito.mock(DataTree.class);
445             final String expPrimaryPath = "akka://test-system/find-primary-shard";
446             ActorContext actorContext =
447                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
448                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
449                         @Override
450                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
451                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
452                         }
453                     };
454
455             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
456             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
457
458             assertNotNull(actual);
459             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
460             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
461             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
462                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
463             assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
464
465             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
466
467             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
468
469             assertEquals(cachedInfo, actual);
470
471             actorContext.getPrimaryShardInfoCache().remove("foobar");
472
473             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
474
475             assertNull(cached);
476     }
477
478     @Test
479     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
480         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
481     }
482
483     @Test
484     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
485         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
486     }
487
488     private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
489         TestActorRef<MessageCollectorActor> shardManager =
490             TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
491
492         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
493             shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
494
495         ActorContext actorContext =
496             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
497                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
498                 @Override
499                 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
500                     return Futures.successful(expectedException);
501                 }
502             };
503
504         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
505
506         try {
507             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
508             fail("Expected" + expectedException.getClass().toString());
509         } catch(Exception e){
510             if(!expectedException.getClass().isInstance(e)) {
511                 fail("Expected Exception of type " + expectedException.getClass().toString());
512             }
513         }
514
515         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
516
517         assertNull(cached);
518     }
519
520     @Test
521     public void testBroadcast() {
522         new JavaTestKit(getSystem()) {{
523             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
524             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
525
526             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
527             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
528             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
529                     DataStoreVersions.CURRENT_VERSION));
530             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
531                     DataStoreVersions.CURRENT_VERSION));
532             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
533
534             Configuration mockConfig = mock(Configuration.class);
535             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
536                     when(mockConfig).getAllShardNames();
537
538             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
539                     mock(ClusterWrapper.class), mockConfig,
540                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
541
542             actorContext.broadcast(new TestMessage());
543
544             expectFirstMatching(shardActorRef1, TestMessage.class);
545             expectFirstMatching(shardActorRef2, TestMessage.class);
546         }};
547     }
548
549     private static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
550         int count = 5000 / 50;
551         for(int i = 0; i < count; i++) {
552             try {
553                 @SuppressWarnings("unchecked")
554                 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
555                 if(message != null) {
556                     return message;
557                 }
558             } catch (Exception e) {}
559
560             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
561         }
562
563         Assert.fail("Did not receive message of type " + clazz);
564         return null;
565     }
566 }