093f0fa6df046ba333858117e385a2a971e12fe6
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.typesafe.config.ConfigFactory;
35 import java.util.Arrays;
36 import java.util.Map;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.Assert;
39 import org.junit.Test;
40 import org.mockito.Mockito;
41 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
42 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
43 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
57 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
58 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 import scala.concurrent.Await;
63 import scala.concurrent.Future;
64 import scala.concurrent.duration.Duration;
65 import scala.concurrent.duration.FiniteDuration;
66
67 public class ActorContextTest extends AbstractActorTest{
68
69     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
70
71     private static class TestMessage {
72     }
73
74     private static class MockShardManager extends UntypedActor {
75
76         private final boolean found;
77         private final ActorRef actorRef;
78         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
79
80         private MockShardManager(boolean found, ActorRef actorRef){
81
82             this.found = found;
83             this.actorRef = actorRef;
84         }
85
86         @Override public void onReceive(Object message) throws Exception {
87             if(message instanceof FindPrimary) {
88                 FindPrimary fp = (FindPrimary)message;
89                 Object resp = findPrimaryResponses.get(fp.getShardName());
90                 if(resp == null) {
91                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
92                 } else {
93                     getSender().tell(resp, getSelf());
94                 }
95
96                 return;
97             }
98
99             if(found){
100                 getSender().tell(new LocalShardFound(actorRef), getSelf());
101             } else {
102                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
103             }
104         }
105
106         void addFindPrimaryResp(String shardName, Object resp) {
107             findPrimaryResponses.put(shardName, resp);
108         }
109
110         private static Props props(final boolean found, final ActorRef actorRef){
111             return Props.create(new MockShardManagerCreator(found, actorRef) );
112         }
113
114         private static Props props(){
115             return Props.create(new MockShardManagerCreator() );
116         }
117
118         @SuppressWarnings("serial")
119         private static class MockShardManagerCreator implements Creator<MockShardManager> {
120             final boolean found;
121             final ActorRef actorRef;
122
123             MockShardManagerCreator() {
124                 this.found = false;
125                 this.actorRef = null;
126             }
127
128             MockShardManagerCreator(boolean found, ActorRef actorRef) {
129                 this.found = found;
130                 this.actorRef = actorRef;
131             }
132
133             @Override
134             public MockShardManager create() throws Exception {
135                 return new MockShardManager(found, actorRef);
136             }
137         }
138     }
139
140     @Test
141     public void testFindLocalShardWithShardFound(){
142         new JavaTestKit(getSystem()) {{
143
144             new Within(duration("1 seconds")) {
145                 @Override
146                 protected void run() {
147
148                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
149
150                     ActorRef shardManagerActorRef = getSystem()
151                         .actorOf(MockShardManager.props(true, shardActorRef));
152
153                     ActorContext actorContext =
154                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
155                             mock(Configuration.class));
156
157                     Optional<ActorRef> out = actorContext.findLocalShard("default");
158
159                     assertEquals(shardActorRef, out.get());
160
161
162                     expectNoMsg();
163                 }
164             };
165         }};
166
167     }
168
169     @Test
170     public void testFindLocalShardWithShardNotFound(){
171         new JavaTestKit(getSystem()) {{
172             ActorRef shardManagerActorRef = getSystem()
173                     .actorOf(MockShardManager.props(false, null));
174
175             ActorContext actorContext =
176                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
177                             mock(Configuration.class));
178
179             Optional<ActorRef> out = actorContext.findLocalShard("default");
180             assertTrue(!out.isPresent());
181         }};
182
183     }
184
185     @Test
186     public void testExecuteRemoteOperation() {
187         new JavaTestKit(getSystem()) {{
188             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
189
190             ActorRef shardManagerActorRef = getSystem()
191                     .actorOf(MockShardManager.props(true, shardActorRef));
192
193             ActorContext actorContext =
194                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
195                             mock(Configuration.class));
196
197             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
198
199             Object out = actorContext.executeOperation(actor, "hello");
200
201             assertEquals("hello", out);
202         }};
203     }
204
205     @Test
206     public void testExecuteRemoteOperationAsync() {
207         new JavaTestKit(getSystem()) {{
208             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
209
210             ActorRef shardManagerActorRef = getSystem()
211                     .actorOf(MockShardManager.props(true, shardActorRef));
212
213             ActorContext actorContext =
214                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
215                             mock(Configuration.class));
216
217             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
218
219             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
220
221             try {
222                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
223                 assertEquals("Result", "hello", result);
224             } catch(Exception e) {
225                 throw new AssertionError(e);
226             }
227         }};
228     }
229
230     @Test
231     public void testIsPathLocal() {
232         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
233         ActorContext actorContext = null;
234
235         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
236         assertEquals(false, actorContext.isPathLocal(null));
237         assertEquals(false, actorContext.isPathLocal(""));
238
239         clusterWrapper.setSelfAddress(null);
240         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
241         assertEquals(false, actorContext.isPathLocal(""));
242
243         // even if the path is in local format, match the primary path (first 3 elements) and return true
244         clusterWrapper.setSelfAddress(new Address("akka", "test"));
245         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
246         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
247
248         clusterWrapper.setSelfAddress(new Address("akka", "test"));
249         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
250         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
251
252         clusterWrapper.setSelfAddress(new Address("akka", "test"));
253         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
254         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
255
256         // self address of remote format,but Tx path local format.
257         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
258         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
259         assertEquals(true, actorContext.isPathLocal(
260             "akka://system/user/shardmanager/shard/transaction"));
261
262         // self address of local format,but Tx path remote format.
263         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
264         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
265         assertEquals(false, actorContext.isPathLocal(
266             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
267
268         //local path but not same
269         clusterWrapper.setSelfAddress(new Address("akka", "test"));
270         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
271         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
272
273         //ip and port same
274         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
275         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
276         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
277
278         // forward-slash missing in address
279         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
280         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
281         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
282
283         //ips differ
284         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
285         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
286         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
287
288         //ports differ
289         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
290         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
291         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
292     }
293
294     @Test
295     public void testResolvePathForRemoteActor() {
296         ActorContext actorContext =
297                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
298                         ClusterWrapper.class),
299                         mock(Configuration.class));
300
301         String actual = actorContext.resolvePath(
302                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
303                 "akka://system/user/shardmanager/shard/transaction");
304
305         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
306
307         assertEquals(expected, actual);
308     }
309
310     @Test
311     public void testResolvePathForLocalActor() {
312         ActorContext actorContext =
313                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
314                         mock(Configuration.class));
315
316         String actual = actorContext.resolvePath(
317                 "akka://system/user/shardmanager/shard",
318                 "akka://system/user/shardmanager/shard/transaction");
319
320         String expected = "akka://system/user/shardmanager/shard/transaction";
321
322         assertEquals(expected, actual);
323     }
324
325     @Test
326     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
327         ActorContext actorContext =
328                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
329                         mock(Configuration.class));
330
331         String actual = actorContext.resolvePath(
332                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
333                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
334
335         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
336
337         assertEquals(expected, actual);
338     }
339
340
341     @Test
342     public void testClientDispatcherIsGlobalDispatcher(){
343         ActorContext actorContext =
344                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
345                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
346
347         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
348
349     }
350
351     @Test
352     public void testClientDispatcherIsNotGlobalDispatcher(){
353         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
354
355         ActorContext actorContext =
356                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
357                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
358
359         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
360
361         actorSystem.shutdown();
362
363     }
364
365     @Test
366     public void testSetDatastoreContext() {
367         new JavaTestKit(getSystem()) {{
368             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
369                             mock(Configuration.class), DatastoreContext.newBuilder().
370                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
371
372             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
373             assertEquals("getTransactionCommitOperationTimeout", 7,
374                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
375
376             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
377                     shardTransactionCommitTimeoutInSeconds(8).build();
378
379             DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
380             Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
381
382             actorContext.setDatastoreContext(mockContextFactory);
383
384             expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
385
386             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
387
388             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
389             assertEquals("getTransactionCommitOperationTimeout", 8,
390                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
391         }};
392     }
393
394     @Test
395     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
396
397             TestActorRef<MessageCollectorActor> shardManager =
398                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
399
400             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
401                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
402
403             final String expPrimaryPath = "akka://test-system/find-primary-shard";
404             final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
405             ActorContext actorContext =
406                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
407                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
408                         @Override
409                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
410                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
411                         }
412                     };
413
414             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
415             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
416
417             assertNotNull(actual);
418             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
419             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
420                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
421             assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
422
423             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
424
425             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
426
427             assertEquals(cachedInfo, actual);
428
429             actorContext.getPrimaryShardInfoCache().remove("foobar");
430
431             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
432
433             assertNull(cached);
434     }
435
436     @Test
437     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
438
439             TestActorRef<MessageCollectorActor> shardManager =
440                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
441
442             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
443                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
444
445             final DataTree mockDataTree = Mockito.mock(DataTree.class);
446             final String expPrimaryPath = "akka://test-system/find-primary-shard";
447             ActorContext actorContext =
448                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
449                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
450                         @Override
451                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
452                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
453                         }
454                     };
455
456             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
457             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
458
459             assertNotNull(actual);
460             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
461             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
462             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
463                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
464             assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
465
466             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
467
468             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
469
470             assertEquals(cachedInfo, actual);
471
472             actorContext.getPrimaryShardInfoCache().remove("foobar");
473
474             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
475
476             assertNull(cached);
477     }
478
479     @Test
480     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
481         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
482     }
483
484     @Test
485     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
486         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
487     }
488
489     private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
490         TestActorRef<MessageCollectorActor> shardManager =
491             TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
492
493         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
494             shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
495
496         ActorContext actorContext =
497             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
498                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
499                 @Override
500                 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
501                     return Futures.successful(expectedException);
502                 }
503             };
504
505         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
506
507         try {
508             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
509             fail("Expected" + expectedException.getClass().toString());
510         } catch(Exception e){
511             if(!expectedException.getClass().isInstance(e)) {
512                 fail("Expected Exception of type " + expectedException.getClass().toString());
513             }
514         }
515
516         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
517
518         assertNull(cached);
519     }
520
521     @Test
522     public void testBroadcast() {
523         new JavaTestKit(getSystem()) {{
524             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
525             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
526
527             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
528             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
529             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
530                     DataStoreVersions.CURRENT_VERSION));
531             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
532                     DataStoreVersions.CURRENT_VERSION));
533             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
534
535             Configuration mockConfig = mock(Configuration.class);
536             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
537                     when(mockConfig).getAllShardNames();
538
539             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
540                     mock(ClusterWrapper.class), mockConfig,
541                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
542
543             actorContext.broadcast(new TestMessage());
544
545             MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
546             MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
547         }};
548     }
549
550 }