Bug 4105: Add CreateShard message in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertNull;
6 import static org.junit.Assert.assertSame;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.mock;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.times;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.AddressFromURIString;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent;
18 import akka.dispatch.Dispatchers;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.persistence.RecoveryCompleted;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Optional;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.ImmutableSet;
28 import com.google.common.collect.Sets;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.mockito.Mock;
41 import org.mockito.MockitoAnnotations;
42 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
43 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
44 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
45 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
46 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
47 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
48 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
49 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
57 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
58 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
59 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
60 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
61 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
62 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
63 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
64 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
65 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
66 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
67 import org.opendaylight.controller.cluster.raft.RaftState;
68 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
69 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
70 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
71 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
73 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
74 import scala.concurrent.Await;
75 import scala.concurrent.Future;
76 import scala.concurrent.duration.FiniteDuration;
77
78 public class ShardManagerTest extends AbstractActorTest {
79     private static int ID_COUNTER = 1;
80
81     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
82     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
83
84     @Mock
85     private static CountDownLatch ready;
86
87     private static TestActorRef<MessageCollectorActor> mockShardActor;
88
89     private static String mockShardName;
90
91     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
92             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
93                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
94
95     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
96         String name = new ShardIdentifier(shardName, memberName,"config").toString();
97         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
98     }
99
100     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
101
102     @Before
103     public void setUp() {
104         MockitoAnnotations.initMocks(this);
105
106         InMemoryJournal.clear();
107
108         if(mockShardActor == null) {
109             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
110             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
111         }
112
113         mockShardActor.underlyingActor().clear();
114     }
115
116     @After
117     public void tearDown() {
118         InMemoryJournal.clear();
119     }
120
121     private Props newShardMgrProps(boolean persistent) {
122         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
123                 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
124     }
125
126     private Props newPropsShardMgrWithMockShardActor() {
127         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
128                 new MockConfiguration());
129     }
130
131     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
132             final ClusterWrapper clusterWrapper, final Configuration config) {
133         Creator<ShardManager> creator = new Creator<ShardManager>() {
134             private static final long serialVersionUID = 1L;
135             @Override
136             public ShardManager create() throws Exception {
137                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
138                         ready, name, shardActor, primaryShardInfoCache);
139             }
140         };
141
142         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
143     }
144
145     @Test
146     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
147         new JavaTestKit(getSystem()) {{
148             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
149
150             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
151
152             shardManager.tell(new FindPrimary("non-existent", false), getRef());
153
154             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
155         }};
156     }
157
158     @Test
159     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
160         new JavaTestKit(getSystem()) {{
161             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
162
163             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
164
165             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
166             shardManager.tell(new ActorInitialized(), mockShardActor);
167
168             DataTree mockDataTree = mock(DataTree.class);
169             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
170                     DataStoreVersions.CURRENT_VERSION), getRef());
171
172             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
173             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
174                     RaftState.Leader.name())), mockShardActor);
175
176             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
177
178             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
179             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
180                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
181             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
182         }};
183     }
184
185     @Test
186     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
187         new JavaTestKit(getSystem()) {{
188             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
189
190             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
191             shardManager.tell(new ActorInitialized(), mockShardActor);
192
193             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
194             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
195             shardManager.tell(new RoleChangeNotification(memberId1,
196                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
197             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
198
199             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
200
201             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
202         }};
203     }
204
205     @Test
206     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
207         new JavaTestKit(getSystem()) {{
208             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
209
210             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
211             shardManager.tell(new ActorInitialized(), mockShardActor);
212
213             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
214             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
215
216             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
217             shardManager.tell(new RoleChangeNotification(memberId1,
218                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
219             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
220             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
221                     leaderVersion), mockShardActor);
222
223             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
224
225             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
226             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
227                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
228             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
229         }};
230     }
231
232     @Test
233     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
234         new JavaTestKit(getSystem()) {{
235             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
236
237             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
238
239             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
240         }};
241     }
242
243     @Test
244     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
245         new JavaTestKit(getSystem()) {{
246             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
247
248             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
249             shardManager.tell(new ActorInitialized(), mockShardActor);
250
251             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
252
253             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
254         }};
255     }
256
257     @Test
258     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
259         new JavaTestKit(getSystem()) {{
260             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
261
262             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
263             shardManager.tell(new ActorInitialized(), mockShardActor);
264
265             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
266             shardManager.tell(new RoleChangeNotification(memberId,
267                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
268
269             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
270
271             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
272
273             DataTree mockDataTree = mock(DataTree.class);
274             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
275                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
276
277             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
278
279             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
280             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
281                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
282             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
283         }};
284     }
285
286     @Test
287     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
288         new JavaTestKit(getSystem()) {{
289             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
290
291             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
292
293             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
294             // delayed until we send ActorInitialized and RoleChangeNotification.
295             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
296
297             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
298
299             shardManager.tell(new ActorInitialized(), mockShardActor);
300
301             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
302
303             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
304             shardManager.tell(new RoleChangeNotification(memberId,
305                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
306
307             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
308
309             DataTree mockDataTree = mock(DataTree.class);
310             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
311                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
312
313             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
314             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
315                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
316             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
317
318             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
319         }};
320     }
321
322     @Test
323     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
324         new JavaTestKit(getSystem()) {{
325             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
326
327             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
328
329             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
330
331             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
332
333             shardManager.tell(new ActorInitialized(), mockShardActor);
334
335             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
336         }};
337     }
338
339     @Test
340     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
341         new JavaTestKit(getSystem()) {{
342             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
343
344             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
345             shardManager.tell(new ActorInitialized(), mockShardActor);
346             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
347                     null, RaftState.Candidate.name()), mockShardActor);
348
349             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
350
351             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
352         }};
353     }
354
355     @Test
356     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
357         new JavaTestKit(getSystem()) {{
358             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
359
360             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
361             shardManager.tell(new ActorInitialized(), mockShardActor);
362             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
363                     null, RaftState.IsolatedLeader.name()), mockShardActor);
364
365             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
366
367             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
368         }};
369     }
370
371     @Test
372     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
373         new JavaTestKit(getSystem()) {{
374             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
375
376             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
377             shardManager.tell(new ActorInitialized(), mockShardActor);
378
379             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
380
381             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
382         }};
383     }
384
385     @Test
386     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
387         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
388
389         // Create an ActorSystem ShardManager actor for member-1.
390
391         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
392         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
393
394         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
395
396         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
397                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
398                         new MockConfiguration()), shardManagerID);
399
400         // Create an ActorSystem ShardManager actor for member-2.
401
402         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
403
404         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
405
406         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
407
408         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
409                 put("default", Arrays.asList("member-1", "member-2")).
410                 put("astronauts", Arrays.asList("member-2")).build());
411
412         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
413                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
414                         mockConfig2), shardManagerID);
415
416         new JavaTestKit(system1) {{
417
418             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
419             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
420
421             shardManager2.tell(new ActorInitialized(), mockShardActor2);
422
423             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
424             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
425             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
426                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
427             shardManager2.tell(new RoleChangeNotification(memberId2,
428                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
429
430             shardManager1.underlyingActor().waitForMemberUp();
431
432             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
433
434             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
435             String path = found.getPrimaryPath();
436             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
437             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
438
439             shardManager2.underlyingActor().verifyFindPrimary();
440
441             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
442
443             shardManager1.underlyingActor().waitForMemberRemoved();
444
445             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
446
447             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
448         }};
449
450         JavaTestKit.shutdownActorSystem(system1);
451         JavaTestKit.shutdownActorSystem(system2);
452     }
453
454     @Test
455     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
456         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
457
458         // Create an ActorSystem ShardManager actor for member-1.
459
460         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
461         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
462
463         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
464
465         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
466             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
467                 new MockConfiguration()), shardManagerID);
468
469         // Create an ActorSystem ShardManager actor for member-2.
470
471         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
472
473         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
474
475         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
476
477         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
478             put("default", Arrays.asList("member-1", "member-2")).build());
479
480         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
481             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
482                 mockConfig2), shardManagerID);
483
484         new JavaTestKit(system1) {{
485
486             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
487             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
488             shardManager1.tell(new ActorInitialized(), mockShardActor1);
489             shardManager2.tell(new ActorInitialized(), mockShardActor2);
490
491             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
492             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
493             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
494                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
495             shardManager1.tell(new RoleChangeNotification(memberId1,
496                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
497             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
498                     DataStoreVersions.CURRENT_VERSION),
499                 mockShardActor2);
500             shardManager2.tell(new RoleChangeNotification(memberId2,
501                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
502             shardManager1.underlyingActor().waitForMemberUp();
503
504             shardManager1.tell(new FindPrimary("default", true), getRef());
505
506             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
507             String path = found.getPrimaryPath();
508             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
509
510             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
511                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
512
513             shardManager1.underlyingActor().waitForUnreachableMember();
514
515             shardManager1.tell(new FindPrimary("default", true), getRef());
516
517             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
518
519             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
520                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
521
522             shardManager1.underlyingActor().waitForReachableMember();
523
524             shardManager1.tell(new FindPrimary("default", true), getRef());
525
526             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
527             String path1 = found1.getPrimaryPath();
528             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
529
530         }};
531
532         JavaTestKit.shutdownActorSystem(system1);
533         JavaTestKit.shutdownActorSystem(system2);
534     }
535
536     @Test
537     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
538         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
539
540         // Create an ActorSystem ShardManager actor for member-1.
541
542         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
543         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
544
545         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
546
547         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
548             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
549                 new MockConfiguration()), shardManagerID);
550
551         // Create an ActorSystem ShardManager actor for member-2.
552
553         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
554
555         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
556
557         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
558
559         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
560             put("default", Arrays.asList("member-1", "member-2")).build());
561
562         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
563             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
564                 mockConfig2), shardManagerID);
565
566         new JavaTestKit(system1) {{
567
568             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
569             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
570             shardManager1.tell(new ActorInitialized(), mockShardActor1);
571             shardManager2.tell(new ActorInitialized(), mockShardActor2);
572
573             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
574             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
575             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
576                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
577             shardManager1.tell(new RoleChangeNotification(memberId1,
578                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
579             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
580                     DataStoreVersions.CURRENT_VERSION),
581                 mockShardActor2);
582             shardManager2.tell(new RoleChangeNotification(memberId2,
583                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
584             shardManager1.underlyingActor().waitForMemberUp();
585
586             shardManager1.tell(new FindPrimary("default", true), getRef());
587
588             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
589             String path = found.getPrimaryPath();
590             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
591
592             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
593                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
594
595             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
596                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
597
598             shardManager1.underlyingActor().waitForUnreachableMember();
599
600             shardManager1.tell(new FindPrimary("default", true), getRef());
601
602             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
603
604             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
605
606             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
607                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
608             shardManager1.tell(new RoleChangeNotification(memberId1,
609                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
610
611             shardManager1.tell(new FindPrimary("default", true), getRef());
612
613             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
614             String path1 = found1.getPrimaryPath();
615             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
616
617         }};
618
619         JavaTestKit.shutdownActorSystem(system1);
620         JavaTestKit.shutdownActorSystem(system2);
621     }
622
623
624     @Test
625     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
626         new JavaTestKit(getSystem()) {{
627             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
628
629             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
630
631             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
632
633             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
634
635             assertEquals("getShardName", "non-existent", notFound.getShardName());
636         }};
637     }
638
639     @Test
640     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
641         new JavaTestKit(getSystem()) {{
642             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
643
644             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
645             shardManager.tell(new ActorInitialized(), mockShardActor);
646
647             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
648
649             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
650
651             assertTrue("Found path contains " + found.getPath().path().toString(),
652                     found.getPath().path().toString().contains("member-1-shard-default-config"));
653         }};
654     }
655
656     @Test
657     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
658         new JavaTestKit(getSystem()) {{
659             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
660
661             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
662
663             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
664         }};
665     }
666
667     @Test
668     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
669         new JavaTestKit(getSystem()) {{
670             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
671
672             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
673
674             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
675             // delayed until we send ActorInitialized.
676             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
677                     new Timeout(5, TimeUnit.SECONDS));
678
679             shardManager.tell(new ActorInitialized(), mockShardActor);
680
681             Object resp = Await.result(future, duration("5 seconds"));
682             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
683         }};
684     }
685
686     @Test
687     public void testOnRecoveryJournalIsCleaned() {
688         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
689                 ImmutableSet.of("foo")));
690         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
691                 ImmutableSet.of("bar")));
692         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
693
694         new JavaTestKit(getSystem()) {{
695             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
696                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
697
698             shardManager.underlyingActor().waitForRecoveryComplete();
699             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
700
701             // Journal entries up to the last one should've been deleted
702             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
703             synchronized (journal) {
704                 assertEquals("Journal size", 0, journal.size());
705             }
706         }};
707     }
708
709     @Test
710     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
711         new JavaTestKit(getSystem()) {
712             {
713                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
714
715                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
716                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
717                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
718
719                 verify(ready, never()).countDown();
720
721                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
722                         Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
723
724                 verify(ready, times(1)).countDown();
725
726             }};
727     }
728
729     @Test
730     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
731         new JavaTestKit(getSystem()) {
732             {
733                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
734
735                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
736                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
737                         memberId, null, RaftState.Follower.name()));
738
739                 verify(ready, never()).countDown();
740
741                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
742
743                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
744                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
745                         DataStoreVersions.CURRENT_VERSION));
746
747                 verify(ready, times(1)).countDown();
748
749             }};
750     }
751
752     @Test
753     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
754         new JavaTestKit(getSystem()) {
755             {
756                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
757
758                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
759                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
760                         memberId, null, RaftState.Follower.name()));
761
762                 verify(ready, never()).countDown();
763
764                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
765                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
766                         DataStoreVersions.CURRENT_VERSION));
767
768                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
769
770                 verify(ready, times(1)).countDown();
771
772             }};
773     }
774
775     @Test
776     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
777         new JavaTestKit(getSystem()) {
778             {
779                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
780
781                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
782                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
783
784                 verify(ready, never()).countDown();
785
786             }};
787     }
788
789
790     @Test
791     public void testByDefaultSyncStatusIsFalse() throws Exception{
792         final Props persistentProps = newShardMgrProps(true);
793         final TestActorRef<ShardManager> shardManager =
794                 TestActorRef.create(getSystem(), persistentProps);
795
796         ShardManager shardManagerActor = shardManager.underlyingActor();
797
798         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
799     }
800
801     @Test
802     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
803         final Props persistentProps = ShardManager.props(
804                 new MockClusterWrapper(),
805                 new MockConfiguration(),
806                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
807         final TestActorRef<ShardManager> shardManager =
808                 TestActorRef.create(getSystem(), persistentProps);
809
810         ShardManager shardManagerActor = shardManager.underlyingActor();
811         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
812                 RaftState.Follower.name(), RaftState.Leader.name()));
813
814         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
815     }
816
817     @Test
818     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
819         final Props persistentProps = newShardMgrProps(true);
820         final TestActorRef<ShardManager> shardManager =
821                 TestActorRef.create(getSystem(), persistentProps);
822
823         ShardManager shardManagerActor = shardManager.underlyingActor();
824         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
825                 RaftState.Follower.name(), RaftState.Candidate.name()));
826
827         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
828
829         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
830         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
831
832         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
833     }
834
835     @Test
836     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
837         final Props persistentProps = ShardManager.props(
838                 new MockClusterWrapper(),
839                 new MockConfiguration(),
840                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
841         final TestActorRef<ShardManager> shardManager =
842                 TestActorRef.create(getSystem(), persistentProps);
843
844         ShardManager shardManagerActor = shardManager.underlyingActor();
845         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
846                 RaftState.Candidate.name(), RaftState.Follower.name()));
847
848         // Initially will be false
849         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
850
851         // Send status true will make sync status true
852         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
853
854         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
855
856         // Send status false will make sync status false
857         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
858
859         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
860
861     }
862
863     @Test
864     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
865         final Props persistentProps = ShardManager.props(
866                 new MockClusterWrapper(),
867                 new MockConfiguration() {
868                     @Override
869                     public List<String> getMemberShardNames(String memberName) {
870                         return Arrays.asList("default", "astronauts");
871                     }
872                 },
873                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
874         final TestActorRef<ShardManager> shardManager =
875                 TestActorRef.create(getSystem(), persistentProps);
876
877         ShardManager shardManagerActor = shardManager.underlyingActor();
878
879         // Initially will be false
880         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
881
882         // Make default shard leader
883         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
884                 RaftState.Follower.name(), RaftState.Leader.name()));
885
886         // default = Leader, astronauts is unknown so sync status remains false
887         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
888
889         // Make astronauts shard leader as well
890         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
891                 RaftState.Follower.name(), RaftState.Leader.name()));
892
893         // Now sync status should be true
894         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
895
896         // Make astronauts a Follower
897         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
898                 RaftState.Leader.name(), RaftState.Follower.name()));
899
900         // Sync status is not true
901         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
902
903         // Make the astronauts follower sync status true
904         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
905
906         // Sync status is now true
907         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
908
909     }
910
911     @Test
912     public void testOnReceiveSwitchShardBehavior() throws Exception {
913         new JavaTestKit(getSystem()) {{
914             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
915
916             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
917             shardManager.tell(new ActorInitialized(), mockShardActor);
918
919             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
920
921             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
922
923             assertEquals(RaftState.Leader, switchBehavior.getNewState());
924             assertEquals(1000, switchBehavior.getNewTerm());
925         }};
926     }
927
928     public void testOnReceiveCreateShard() {
929         new JavaTestKit(getSystem()) {{
930             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
931
932             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
933
934             SchemaContext schemaContext = TestModel.createTestContext();
935             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
936
937             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
938                     persistent(false).build();
939             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
940
941             shardManager.tell(new CreateShard("foo", Arrays.asList("member-1", "member-5", "member-6"), shardPropsCreator,
942                     datastoreContext), getRef());
943
944             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
945
946             shardManager.tell(new FindLocalShard("foo", true), getRef());
947
948             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
949
950             assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
951             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
952                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
953                     shardPropsCreator.peerAddresses.keySet());
954             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
955                     shardPropsCreator.shardId);
956             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
957
958             // Send CreateShard with same name - should fail.
959
960             shardManager.tell(new CreateShard("foo", Collections.<String>emptyList(), shardPropsCreator, null), getRef());
961
962             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
963         }};
964     }
965
966     @Test
967     public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
968         new JavaTestKit(getSystem()) {{
969             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
970
971             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
972
973             shardManager.tell(new CreateShard("foo", Arrays.asList("member-1"), shardPropsCreator, null), getRef());
974
975             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
976
977             SchemaContext schemaContext = TestModel.createTestContext();
978             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
979
980             shardManager.tell(new FindLocalShard("foo", true), getRef());
981
982             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
983
984             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
985             assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext);
986         }};
987     }
988
989     private static class TestShardPropsCreator implements ShardPropsCreator {
990         ShardIdentifier shardId;
991         Map<String, String> peerAddresses;
992         SchemaContext schemaContext;
993         DatastoreContext datastoreContext;
994
995         @Override
996         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
997                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
998             this.shardId = shardId;
999             this.peerAddresses = peerAddresses;
1000             this.schemaContext = schemaContext;
1001             this.datastoreContext = datastoreContext;
1002             return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
1003         }
1004
1005     }
1006
1007     private static class TestShardManager extends ShardManager {
1008         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1009
1010         TestShardManager(String shardMrgIDSuffix) {
1011             super(new MockClusterWrapper(), new MockConfiguration(),
1012                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1013                     new PrimaryShardInfoFutureCache());
1014         }
1015
1016         @Override
1017         public void handleRecover(Object message) throws Exception {
1018             try {
1019                 super.handleRecover(message);
1020             } finally {
1021                 if(message instanceof RecoveryCompleted) {
1022                     recoveryComplete.countDown();
1023                 }
1024             }
1025         }
1026
1027         void waitForRecoveryComplete() {
1028             assertEquals("Recovery complete", true,
1029                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1030         }
1031     }
1032
1033     @SuppressWarnings("serial")
1034     static class TestShardManagerCreator implements Creator<TestShardManager> {
1035         String shardMrgIDSuffix;
1036
1037         TestShardManagerCreator(String shardMrgIDSuffix) {
1038             this.shardMrgIDSuffix = shardMrgIDSuffix;
1039         }
1040
1041         @Override
1042         public TestShardManager create() throws Exception {
1043             return new TestShardManager(shardMrgIDSuffix);
1044         }
1045
1046     }
1047
1048     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1049         private static final long serialVersionUID = 1L;
1050         private final Creator<ShardManager> delegate;
1051
1052         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1053             this.delegate = delegate;
1054         }
1055
1056         @Override
1057         public ShardManager create() throws Exception {
1058             return delegate.create();
1059         }
1060     }
1061
1062     private static class ForwardingShardManager extends ShardManager {
1063         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1064         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1065         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1066         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1067         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1068         private final ActorRef shardActor;
1069         private final String name;
1070
1071         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1072                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1073                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1074             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1075             this.shardActor = shardActor;
1076             this.name = name;
1077         }
1078
1079         @Override
1080         public void handleCommand(Object message) throws Exception {
1081             try{
1082                 super.handleCommand(message);
1083             } finally {
1084                 if(message instanceof FindPrimary) {
1085                     findPrimaryMessageReceived.countDown();
1086                 } else if(message instanceof ClusterEvent.MemberUp) {
1087                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1088                     if(!getCluster().getCurrentMemberName().equals(role)) {
1089                         memberUpReceived.countDown();
1090                     }
1091                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1092                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1093                     if(!getCluster().getCurrentMemberName().equals(role)) {
1094                         memberRemovedReceived.countDown();
1095                     }
1096                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1097                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1098                     if(!getCluster().getCurrentMemberName().equals(role)) {
1099                         memberUnreachableReceived.countDown();
1100                     }
1101                 } else if(message instanceof ClusterEvent.ReachableMember) {
1102                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1103                     if(!getCluster().getCurrentMemberName().equals(role)) {
1104                         memberReachableReceived.countDown();
1105                     }
1106                 }
1107             }
1108         }
1109
1110         @Override
1111         public String persistenceId() {
1112             return name;
1113         }
1114
1115         @Override
1116         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1117             return shardActor;
1118         }
1119
1120         void waitForMemberUp() {
1121             assertEquals("MemberUp received", true,
1122                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1123             memberUpReceived = new CountDownLatch(1);
1124         }
1125
1126         void waitForMemberRemoved() {
1127             assertEquals("MemberRemoved received", true,
1128                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1129             memberRemovedReceived = new CountDownLatch(1);
1130         }
1131
1132         void waitForUnreachableMember() {
1133             assertEquals("UnreachableMember received", true,
1134                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1135                 ));
1136             memberUnreachableReceived = new CountDownLatch(1);
1137         }
1138
1139         void waitForReachableMember() {
1140             assertEquals("ReachableMember received", true,
1141                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1142             memberReachableReceived = new CountDownLatch(1);
1143         }
1144
1145         void verifyFindPrimary() {
1146             assertEquals("FindPrimary received", true,
1147                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1148             findPrimaryMessageReceived = new CountDownLatch(1);
1149         }
1150     }
1151 }