Merge "Add missing copyright text"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.junit.Assert.fail;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Futures;
19 import akka.japi.Creator;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Arrays;
29 import java.util.Map;
30 import java.util.concurrent.TimeUnit;
31 import org.apache.commons.lang.time.StopWatch;
32 import org.junit.Assert;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
36 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
37 import org.opendaylight.controller.cluster.datastore.Configuration;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
43 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
48 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
49 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import scala.concurrent.Await;
53 import scala.concurrent.Future;
54 import scala.concurrent.duration.Duration;
55 import scala.concurrent.duration.FiniteDuration;
56
57 public class ActorContextTest extends AbstractActorTest{
58
59     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
60
61     private static class TestMessage {
62     }
63
64     private static class MockShardManager extends UntypedActor {
65
66         private final boolean found;
67         private final ActorRef actorRef;
68         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
69
70         private MockShardManager(boolean found, ActorRef actorRef){
71
72             this.found = found;
73             this.actorRef = actorRef;
74         }
75
76         @Override public void onReceive(Object message) throws Exception {
77             if(message instanceof FindPrimary) {
78                 FindPrimary fp = (FindPrimary)message;
79                 Object resp = findPrimaryResponses.get(fp.getShardName());
80                 if(resp == null) {
81                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
82                 } else {
83                     getSender().tell(resp, getSelf());
84                 }
85
86                 return;
87             }
88
89             if(found){
90                 getSender().tell(new LocalShardFound(actorRef), getSelf());
91             } else {
92                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
93             }
94         }
95
96         void addFindPrimaryResp(String shardName, Object resp) {
97             findPrimaryResponses.put(shardName, resp);
98         }
99
100         private static Props props(final boolean found, final ActorRef actorRef){
101             return Props.create(new MockShardManagerCreator(found, actorRef) );
102         }
103
104         private static Props props(){
105             return Props.create(new MockShardManagerCreator() );
106         }
107
108         @SuppressWarnings("serial")
109         private static class MockShardManagerCreator implements Creator<MockShardManager> {
110             final boolean found;
111             final ActorRef actorRef;
112
113             MockShardManagerCreator() {
114                 this.found = false;
115                 this.actorRef = null;
116             }
117
118             MockShardManagerCreator(boolean found, ActorRef actorRef) {
119                 this.found = found;
120                 this.actorRef = actorRef;
121             }
122
123             @Override
124             public MockShardManager create() throws Exception {
125                 return new MockShardManager(found, actorRef);
126             }
127         }
128     }
129
130     @Test
131     public void testFindLocalShardWithShardFound(){
132         new JavaTestKit(getSystem()) {{
133
134             new Within(duration("1 seconds")) {
135                 @Override
136                 protected void run() {
137
138                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
139
140                     ActorRef shardManagerActorRef = getSystem()
141                         .actorOf(MockShardManager.props(true, shardActorRef));
142
143                     ActorContext actorContext =
144                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
145                             mock(Configuration.class));
146
147                     Optional<ActorRef> out = actorContext.findLocalShard("default");
148
149                     assertEquals(shardActorRef, out.get());
150
151
152                     expectNoMsg();
153                 }
154             };
155         }};
156
157     }
158
159     @Test
160     public void testFindLocalShardWithShardNotFound(){
161         new JavaTestKit(getSystem()) {{
162             ActorRef shardManagerActorRef = getSystem()
163                     .actorOf(MockShardManager.props(false, null));
164
165             ActorContext actorContext =
166                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
167                             mock(Configuration.class));
168
169             Optional<ActorRef> out = actorContext.findLocalShard("default");
170             assertTrue(!out.isPresent());
171         }};
172
173     }
174
175     @Test
176     public void testExecuteRemoteOperation() {
177         new JavaTestKit(getSystem()) {{
178             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
179
180             ActorRef shardManagerActorRef = getSystem()
181                     .actorOf(MockShardManager.props(true, shardActorRef));
182
183             ActorContext actorContext =
184                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
185                             mock(Configuration.class));
186
187             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
188
189             Object out = actorContext.executeOperation(actor, "hello");
190
191             assertEquals("hello", out);
192         }};
193     }
194
195     @Test
196     public void testExecuteRemoteOperationAsync() {
197         new JavaTestKit(getSystem()) {{
198             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
199
200             ActorRef shardManagerActorRef = getSystem()
201                     .actorOf(MockShardManager.props(true, shardActorRef));
202
203             ActorContext actorContext =
204                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
205                             mock(Configuration.class));
206
207             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
208
209             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
210
211             try {
212                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
213                 assertEquals("Result", "hello", result);
214             } catch(Exception e) {
215                 throw new AssertionError(e);
216             }
217         }};
218     }
219
220     @Test
221     public void testIsPathLocal() {
222         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
223         ActorContext actorContext = null;
224
225         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
226         assertEquals(false, actorContext.isPathLocal(null));
227         assertEquals(false, actorContext.isPathLocal(""));
228
229         clusterWrapper.setSelfAddress(null);
230         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
231         assertEquals(false, actorContext.isPathLocal(""));
232
233         // even if the path is in local format, match the primary path (first 3 elements) and return true
234         clusterWrapper.setSelfAddress(new Address("akka", "test"));
235         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
236         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
237
238         clusterWrapper.setSelfAddress(new Address("akka", "test"));
239         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
240         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
241
242         clusterWrapper.setSelfAddress(new Address("akka", "test"));
243         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
244         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
245
246         // self address of remote format,but Tx path local format.
247         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
248         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
249         assertEquals(true, actorContext.isPathLocal(
250             "akka://system/user/shardmanager/shard/transaction"));
251
252         // self address of local format,but Tx path remote format.
253         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
254         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
255         assertEquals(false, actorContext.isPathLocal(
256             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
257
258         //local path but not same
259         clusterWrapper.setSelfAddress(new Address("akka", "test"));
260         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
261         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
262
263         //ip and port same
264         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
265         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
266         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
267
268         // forward-slash missing in address
269         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
270         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
271         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
272
273         //ips differ
274         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
275         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
276         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
277
278         //ports differ
279         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
280         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
281         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
282     }
283
284     @Test
285     public void testResolvePathForRemoteActor() {
286         ActorContext actorContext =
287                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
288                         ClusterWrapper.class),
289                         mock(Configuration.class));
290
291         String actual = actorContext.resolvePath(
292                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
293                 "akka://system/user/shardmanager/shard/transaction");
294
295         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
296
297         assertEquals(expected, actual);
298     }
299
300     @Test
301     public void testResolvePathForLocalActor() {
302         ActorContext actorContext =
303                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
304                         mock(Configuration.class));
305
306         String actual = actorContext.resolvePath(
307                 "akka://system/user/shardmanager/shard",
308                 "akka://system/user/shardmanager/shard/transaction");
309
310         String expected = "akka://system/user/shardmanager/shard/transaction";
311
312         assertEquals(expected, actual);
313     }
314
315     @Test
316     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
317         ActorContext actorContext =
318                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
319                         mock(Configuration.class));
320
321         String actual = actorContext.resolvePath(
322                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
323                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
324
325         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
326
327         assertEquals(expected, actual);
328     }
329
330     @Test
331     public void testRateLimiting(){
332         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
333                 transactionCreationInitialRateLimit(155L).build();
334
335         ActorContext actorContext =
336                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
337                         mock(Configuration.class), dataStoreContext);
338
339         // Check that the initial value is being picked up from DataStoreContext
340         assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
341
342         actorContext.setTxCreationLimit(1.0);
343
344         assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
345
346
347         StopWatch watch = new StopWatch();
348
349         watch.start();
350
351         actorContext.acquireTxCreationPermit();
352         actorContext.acquireTxCreationPermit();
353         actorContext.acquireTxCreationPermit();
354
355         watch.stop();
356
357         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
358     }
359
360     @Test
361     public void testClientDispatcherIsGlobalDispatcher(){
362         ActorContext actorContext =
363                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
364                         mock(Configuration.class), DatastoreContext.newBuilder().build());
365
366         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
367
368     }
369
370     @Test
371     public void testClientDispatcherIsNotGlobalDispatcher(){
372         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
373
374         ActorContext actorContext =
375                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
376                         mock(Configuration.class), DatastoreContext.newBuilder().build());
377
378         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
379
380         actorSystem.shutdown();
381
382     }
383
384     @Test
385     public void testSetDatastoreContext() {
386         new JavaTestKit(getSystem()) {{
387             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
388                             mock(Configuration.class), DatastoreContext.newBuilder().
389                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
390
391             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
392             assertEquals("getTransactionCommitOperationTimeout", 7,
393                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
394
395             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
396                     shardTransactionCommitTimeoutInSeconds(8).build();
397
398             actorContext.setDatastoreContext(newContext);
399
400             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
401
402             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
403
404             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
405             assertEquals("getTransactionCommitOperationTimeout", 8,
406                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
407         }};
408     }
409
410     @Test
411     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
412
413             TestActorRef<MessageCollectorActor> shardManager =
414                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
415
416             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
417                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
418
419             final String expPrimaryPath = "akka://test-system/find-primary-shard";
420             ActorContext actorContext =
421                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
422                             mock(Configuration.class), dataStoreContext) {
423                         @Override
424                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
425                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
426                         }
427                     };
428
429             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
430             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
431
432             assertNotNull(actual);
433             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
434             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
435                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
436
437             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
438
439             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
440
441             assertEquals(cachedInfo, actual);
442
443             // Wait for 200 Milliseconds. The cached entry should have been removed.
444
445             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
446
447             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
448
449             assertNull(cached);
450     }
451
452     @Test
453     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
454
455             TestActorRef<MessageCollectorActor> shardManager =
456                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
457
458             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
459                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
460
461             final DataTree mockDataTree = Mockito.mock(DataTree.class);
462             final String expPrimaryPath = "akka://test-system/find-primary-shard";
463             ActorContext actorContext =
464                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
465                             mock(Configuration.class), dataStoreContext) {
466                         @Override
467                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
468                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
469                         }
470                     };
471
472             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
473             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
474
475             assertNotNull(actual);
476             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
477             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
478             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
479                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
480
481             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
482
483             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
484
485             assertEquals(cachedInfo, actual);
486
487             // Wait for 200 Milliseconds. The cached entry should have been removed.
488
489             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
490
491             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
492
493             assertNull(cached);
494     }
495
496     @Test
497     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
498
499             TestActorRef<MessageCollectorActor> shardManager =
500                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
501
502             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
503                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
504
505             ActorContext actorContext =
506                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
507                             mock(Configuration.class), dataStoreContext) {
508                         @Override
509                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
510                             return Futures.successful((Object) new PrimaryNotFoundException("not found"));
511                         }
512                     };
513
514
515             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
516
517             try {
518                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
519                 fail("Expected PrimaryNotFoundException");
520             } catch(PrimaryNotFoundException e){
521
522             }
523
524             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
525
526             assertNull(cached);
527     }
528
529     @Test
530     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
531
532             TestActorRef<MessageCollectorActor> shardManager =
533                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
534
535             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
536                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
537
538             ActorContext actorContext =
539                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
540                             mock(Configuration.class), dataStoreContext) {
541                         @Override
542                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
543                             return Futures.successful((Object) new NotInitializedException("not iniislized"));
544                         }
545                     };
546
547
548             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
549
550             try {
551                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
552                 fail("Expected NotInitializedException");
553             } catch(NotInitializedException e){
554
555             }
556
557             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
558
559             assertNull(cached);
560     }
561
562     @Test
563     public void testBroadcast() {
564         new JavaTestKit(getSystem()) {{
565             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
566             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
567
568             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
569             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
570             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
571             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
572             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
573
574             Configuration mockConfig = mock(Configuration.class);
575             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
576                     when(mockConfig).getAllShardNames();
577
578             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
579                     mock(ClusterWrapper.class), mockConfig,
580                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
581
582             actorContext.broadcast(new TestMessage());
583
584             expectFirstMatching(shardActorRef1, TestMessage.class);
585             expectFirstMatching(shardActorRef2, TestMessage.class);
586         }};
587     }
588
589     private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
590         int count = 5000 / 50;
591         for(int i = 0; i < count; i++) {
592             try {
593                 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
594                 if(message != null) {
595                     return message;
596                 }
597             } catch (Exception e) {}
598
599             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
600         }
601
602         Assert.fail("Did not receive message of type " + clazz);
603         return null;
604     }
605 }