BUG-1014: clean up DatastoreContext.dataStoreType()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.typesafe.config.ConfigFactory;
35 import java.util.Arrays;
36 import java.util.Map;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.Assert;
39 import org.junit.Test;
40 import org.mockito.Mockito;
41 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
42 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
43 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
57 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
58 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
59 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63 import scala.concurrent.Await;
64 import scala.concurrent.Future;
65 import scala.concurrent.duration.Duration;
66 import scala.concurrent.duration.FiniteDuration;
67
68 public class ActorContextTest extends AbstractActorTest{
69
70     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
71
72     private static class TestMessage {
73     }
74
75     private static class MockShardManager extends UntypedActor {
76
77         private final boolean found;
78         private final ActorRef actorRef;
79         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
80
81         private MockShardManager(boolean found, ActorRef actorRef){
82
83             this.found = found;
84             this.actorRef = actorRef;
85         }
86
87         @Override public void onReceive(Object message) throws Exception {
88             if(message instanceof FindPrimary) {
89                 FindPrimary fp = (FindPrimary)message;
90                 Object resp = findPrimaryResponses.get(fp.getShardName());
91                 if(resp == null) {
92                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
93                 } else {
94                     getSender().tell(resp, getSelf());
95                 }
96
97                 return;
98             }
99
100             if(found){
101                 getSender().tell(new LocalShardFound(actorRef), getSelf());
102             } else {
103                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
104             }
105         }
106
107         void addFindPrimaryResp(String shardName, Object resp) {
108             findPrimaryResponses.put(shardName, resp);
109         }
110
111         private static Props props(final boolean found, final ActorRef actorRef){
112             return Props.create(new MockShardManagerCreator(found, actorRef) );
113         }
114
115         private static Props props(){
116             return Props.create(new MockShardManagerCreator() );
117         }
118
119         @SuppressWarnings("serial")
120         private static class MockShardManagerCreator implements Creator<MockShardManager> {
121             final boolean found;
122             final ActorRef actorRef;
123
124             MockShardManagerCreator() {
125                 this.found = false;
126                 this.actorRef = null;
127             }
128
129             MockShardManagerCreator(boolean found, ActorRef actorRef) {
130                 this.found = found;
131                 this.actorRef = actorRef;
132             }
133
134             @Override
135             public MockShardManager create() throws Exception {
136                 return new MockShardManager(found, actorRef);
137             }
138         }
139     }
140
141     @Test
142     public void testFindLocalShardWithShardFound(){
143         new JavaTestKit(getSystem()) {{
144
145             new Within(duration("1 seconds")) {
146                 @Override
147                 protected void run() {
148
149                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
150
151                     ActorRef shardManagerActorRef = getSystem()
152                         .actorOf(MockShardManager.props(true, shardActorRef));
153
154                     ActorContext actorContext =
155                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
156                             mock(Configuration.class));
157
158                     Optional<ActorRef> out = actorContext.findLocalShard("default");
159
160                     assertEquals(shardActorRef, out.get());
161
162
163                     expectNoMsg();
164                 }
165             };
166         }};
167
168     }
169
170     @Test
171     public void testFindLocalShardWithShardNotFound(){
172         new JavaTestKit(getSystem()) {{
173             ActorRef shardManagerActorRef = getSystem()
174                     .actorOf(MockShardManager.props(false, null));
175
176             ActorContext actorContext =
177                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
178                             mock(Configuration.class));
179
180             Optional<ActorRef> out = actorContext.findLocalShard("default");
181             assertTrue(!out.isPresent());
182         }};
183
184     }
185
186     @Test
187     public void testExecuteRemoteOperation() {
188         new JavaTestKit(getSystem()) {{
189             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
190
191             ActorRef shardManagerActorRef = getSystem()
192                     .actorOf(MockShardManager.props(true, shardActorRef));
193
194             ActorContext actorContext =
195                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
196                             mock(Configuration.class));
197
198             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
199
200             Object out = actorContext.executeOperation(actor, "hello");
201
202             assertEquals("hello", out);
203         }};
204     }
205
206     @Test
207     public void testExecuteRemoteOperationAsync() {
208         new JavaTestKit(getSystem()) {{
209             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
210
211             ActorRef shardManagerActorRef = getSystem()
212                     .actorOf(MockShardManager.props(true, shardActorRef));
213
214             ActorContext actorContext =
215                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
216                             mock(Configuration.class));
217
218             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
219
220             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
221
222             try {
223                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
224                 assertEquals("Result", "hello", result);
225             } catch(Exception e) {
226                 throw new AssertionError(e);
227             }
228         }};
229     }
230
231     @Test
232     public void testIsPathLocal() {
233         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
234         ActorContext actorContext = null;
235
236         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
237         assertEquals(false, actorContext.isPathLocal(null));
238         assertEquals(false, actorContext.isPathLocal(""));
239
240         clusterWrapper.setSelfAddress(null);
241         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
242         assertEquals(false, actorContext.isPathLocal(""));
243
244         // even if the path is in local format, match the primary path (first 3 elements) and return true
245         clusterWrapper.setSelfAddress(new Address("akka", "test"));
246         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
247         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
248
249         clusterWrapper.setSelfAddress(new Address("akka", "test"));
250         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
251         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
252
253         clusterWrapper.setSelfAddress(new Address("akka", "test"));
254         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
255         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
256
257         // self address of remote format,but Tx path local format.
258         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
259         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
260         assertEquals(true, actorContext.isPathLocal(
261             "akka://system/user/shardmanager/shard/transaction"));
262
263         // self address of local format,but Tx path remote format.
264         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
265         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
266         assertEquals(false, actorContext.isPathLocal(
267             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
268
269         //local path but not same
270         clusterWrapper.setSelfAddress(new Address("akka", "test"));
271         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
272         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
273
274         //ip and port same
275         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
276         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
277         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
278
279         // forward-slash missing in address
280         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
281         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
282         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
283
284         //ips differ
285         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
286         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
287         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
288
289         //ports differ
290         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
291         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
292         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
293     }
294
295     @Test
296     public void testResolvePathForRemoteActor() {
297         ActorContext actorContext =
298                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
299                         ClusterWrapper.class),
300                         mock(Configuration.class));
301
302         String actual = actorContext.resolvePath(
303                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
304                 "akka://system/user/shardmanager/shard/transaction");
305
306         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
307
308         assertEquals(expected, actual);
309     }
310
311     @Test
312     public void testResolvePathForLocalActor() {
313         ActorContext actorContext =
314                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
315                         mock(Configuration.class));
316
317         String actual = actorContext.resolvePath(
318                 "akka://system/user/shardmanager/shard",
319                 "akka://system/user/shardmanager/shard/transaction");
320
321         String expected = "akka://system/user/shardmanager/shard/transaction";
322
323         assertEquals(expected, actual);
324     }
325
326     @Test
327     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
328         ActorContext actorContext =
329                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
330                         mock(Configuration.class));
331
332         String actual = actorContext.resolvePath(
333                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
334                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
335
336         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
337
338         assertEquals(expected, actual);
339     }
340
341
342     @Test
343     public void testClientDispatcherIsGlobalDispatcher(){
344         ActorContext actorContext =
345                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
346                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
347
348         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
349
350     }
351
352     @Test
353     public void testClientDispatcherIsNotGlobalDispatcher(){
354         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
355
356         ActorContext actorContext =
357                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
358                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
359
360         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
361
362         actorSystem.shutdown();
363
364     }
365
366     @Test
367     public void testSetDatastoreContext() {
368         new JavaTestKit(getSystem()) {{
369             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
370                             mock(Configuration.class), DatastoreContext.newBuilder().
371                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
372
373             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
374             assertEquals("getTransactionCommitOperationTimeout", 7,
375                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
376
377             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
378                     shardTransactionCommitTimeoutInSeconds(8).build();
379
380             DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
381             Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
382
383             actorContext.setDatastoreContext(mockContextFactory);
384
385             expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
386
387             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
388
389             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
390             assertEquals("getTransactionCommitOperationTimeout", 8,
391                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
392         }};
393     }
394
395     @Test
396     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
397
398             TestActorRef<MessageCollectorActor> shardManager =
399                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
400
401             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
402                     logicalStoreType(LogicalDatastoreType.CONFIGURATION).
403                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
404
405             final String expPrimaryPath = "akka://test-system/find-primary-shard";
406             final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
407             ActorContext actorContext =
408                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
409                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
410                         @Override
411                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
412                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
413                         }
414                     };
415
416             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
417             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
418
419             assertNotNull(actual);
420             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
421             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
422                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
423             assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
424
425             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
426
427             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
428
429             assertEquals(cachedInfo, actual);
430
431             actorContext.getPrimaryShardInfoCache().remove("foobar");
432
433             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
434
435             assertNull(cached);
436     }
437
438     @Test
439     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
440
441             TestActorRef<MessageCollectorActor> shardManager =
442                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
443
444             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
445                     logicalStoreType(LogicalDatastoreType.CONFIGURATION).
446                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
447
448             final DataTree mockDataTree = Mockito.mock(DataTree.class);
449             final String expPrimaryPath = "akka://test-system/find-primary-shard";
450             ActorContext actorContext =
451                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
452                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
453                         @Override
454                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
455                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
456                         }
457                     };
458
459             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
460             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
461
462             assertNotNull(actual);
463             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
464             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
465             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
466                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
467             assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
468
469             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
470
471             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
472
473             assertEquals(cachedInfo, actual);
474
475             actorContext.getPrimaryShardInfoCache().remove("foobar");
476
477             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
478
479             assertNull(cached);
480     }
481
482     @Test
483     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
484         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
485     }
486
487     @Test
488     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
489         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
490     }
491
492     private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
493         TestActorRef<MessageCollectorActor> shardManager =
494             TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
495
496         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
497             logicalStoreType(LogicalDatastoreType.CONFIGURATION).
498             shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
499
500         ActorContext actorContext =
501             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
502                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
503                 @Override
504                 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
505                     return Futures.successful(expectedException);
506                 }
507             };
508
509         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
510
511         try {
512             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
513             fail("Expected" + expectedException.getClass().toString());
514         } catch(Exception e){
515             if(!expectedException.getClass().isInstance(e)) {
516                 fail("Expected Exception of type " + expectedException.getClass().toString());
517             }
518         }
519
520         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
521
522         assertNull(cached);
523     }
524
525     @Test
526     public void testBroadcast() {
527         new JavaTestKit(getSystem()) {{
528             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
529             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
530
531             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
532             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
533             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
534                     DataStoreVersions.CURRENT_VERSION));
535             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
536                     DataStoreVersions.CURRENT_VERSION));
537             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
538
539             Configuration mockConfig = mock(Configuration.class);
540             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
541                     when(mockConfig).getAllShardNames();
542
543             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
544                     mock(ClusterWrapper.class), mockConfig,
545                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
546
547             actorContext.broadcast(new TestMessage());
548
549             MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
550             MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
551         }};
552     }
553
554 }