Bug 4105: Add dynamic module/shard config for entity-owners shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.Props;
24 import akka.cluster.Cluster;
25 import akka.cluster.ClusterEvent;
26 import akka.dispatch.Dispatchers;
27 import akka.japi.Creator;
28 import akka.pattern.Patterns;
29 import akka.persistence.RecoveryCompleted;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.ImmutableSet;
36 import com.google.common.collect.Sets;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import com.typesafe.config.ConfigFactory;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.opendaylight.controller.cluster.datastore.config.Configuration;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
56 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
57 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
59 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
60 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
61 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
62 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
63 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
64 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
65 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
66 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
67 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
68 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
69 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
70 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
71 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
72 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
73 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
74 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
75 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
76 import org.opendaylight.controller.cluster.raft.RaftState;
77 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
78 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
79 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
80 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
81 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
82 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
83 import scala.concurrent.Await;
84 import scala.concurrent.Future;
85 import scala.concurrent.duration.FiniteDuration;
86
87 public class ShardManagerTest extends AbstractActorTest {
88     private static int ID_COUNTER = 1;
89
90     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
91     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
92
93     @Mock
94     private static CountDownLatch ready;
95
96     private static TestActorRef<MessageCollectorActor> mockShardActor;
97
98     private static String mockShardName;
99
100     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
101             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
102                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
103
104     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
105         String name = new ShardIdentifier(shardName, memberName,"config").toString();
106         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
107     }
108
109     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
110
111     @Before
112     public void setUp() {
113         MockitoAnnotations.initMocks(this);
114
115         InMemoryJournal.clear();
116
117         if(mockShardActor == null) {
118             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
119             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
120         }
121
122         mockShardActor.underlyingActor().clear();
123     }
124
125     @After
126     public void tearDown() {
127         InMemoryJournal.clear();
128     }
129
130     private Props newShardMgrProps(boolean persistent) {
131         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
132                 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
133     }
134
135     private Props newPropsShardMgrWithMockShardActor() {
136         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
137                 new MockConfiguration());
138     }
139
140     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
141             final ClusterWrapper clusterWrapper, final Configuration config) {
142         Creator<ShardManager> creator = new Creator<ShardManager>() {
143             private static final long serialVersionUID = 1L;
144             @Override
145             public ShardManager create() throws Exception {
146                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
147                         ready, name, shardActor, primaryShardInfoCache);
148             }
149         };
150
151         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
152     }
153
154     @Test
155     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
156         new JavaTestKit(getSystem()) {{
157             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
158
159             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
160
161             shardManager.tell(new FindPrimary("non-existent", false), getRef());
162
163             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
164         }};
165     }
166
167     @Test
168     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
169         new JavaTestKit(getSystem()) {{
170             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
171
172             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
173
174             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
175             shardManager.tell(new ActorInitialized(), mockShardActor);
176
177             DataTree mockDataTree = mock(DataTree.class);
178             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
179                     DataStoreVersions.CURRENT_VERSION), getRef());
180
181             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
182             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
183                     RaftState.Leader.name())), mockShardActor);
184
185             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
186
187             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
188             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
189                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
190             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
191         }};
192     }
193
194     @Test
195     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
196         new JavaTestKit(getSystem()) {{
197             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
198
199             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
200             shardManager.tell(new ActorInitialized(), mockShardActor);
201
202             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
203             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
204             shardManager.tell(new RoleChangeNotification(memberId1,
205                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
206             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
207
208             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
209
210             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
211         }};
212     }
213
214     @Test
215     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
216         new JavaTestKit(getSystem()) {{
217             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
218
219             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
220             shardManager.tell(new ActorInitialized(), mockShardActor);
221
222             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
223             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
224
225             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
226             shardManager.tell(new RoleChangeNotification(memberId1,
227                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
228             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
229             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
230                     leaderVersion), mockShardActor);
231
232             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
233
234             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
235             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
236                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
237             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
238         }};
239     }
240
241     @Test
242     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
243         new JavaTestKit(getSystem()) {{
244             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
245
246             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
247
248             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
249         }};
250     }
251
252     @Test
253     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
254         new JavaTestKit(getSystem()) {{
255             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
256
257             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
258             shardManager.tell(new ActorInitialized(), mockShardActor);
259
260             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
261
262             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
263         }};
264     }
265
266     @Test
267     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
268         new JavaTestKit(getSystem()) {{
269             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
270
271             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
272             shardManager.tell(new ActorInitialized(), mockShardActor);
273
274             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
275             shardManager.tell(new RoleChangeNotification(memberId,
276                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
277
278             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
279
280             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
281
282             DataTree mockDataTree = mock(DataTree.class);
283             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
284                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
285
286             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
287
288             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
289             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
290                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
291             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
292         }};
293     }
294
295     @Test
296     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
297         new JavaTestKit(getSystem()) {{
298             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
299
300             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
301
302             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
303             // delayed until we send ActorInitialized and RoleChangeNotification.
304             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
305
306             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
307
308             shardManager.tell(new ActorInitialized(), mockShardActor);
309
310             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
311
312             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
313             shardManager.tell(new RoleChangeNotification(memberId,
314                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
315
316             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
317
318             DataTree mockDataTree = mock(DataTree.class);
319             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
320                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
321
322             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
323             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
324                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
325             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
326
327             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
328         }};
329     }
330
331     @Test
332     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
333         new JavaTestKit(getSystem()) {{
334             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
335
336             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
337
338             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
339
340             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
341
342             shardManager.tell(new ActorInitialized(), mockShardActor);
343
344             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
345         }};
346     }
347
348     @Test
349     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
350         new JavaTestKit(getSystem()) {{
351             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
352
353             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
354             shardManager.tell(new ActorInitialized(), mockShardActor);
355             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
356                     null, RaftState.Candidate.name()), mockShardActor);
357
358             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
359
360             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
361         }};
362     }
363
364     @Test
365     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
366         new JavaTestKit(getSystem()) {{
367             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
368
369             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
370             shardManager.tell(new ActorInitialized(), mockShardActor);
371             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
372                     null, RaftState.IsolatedLeader.name()), mockShardActor);
373
374             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
375
376             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
377         }};
378     }
379
380     @Test
381     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
382         new JavaTestKit(getSystem()) {{
383             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
384
385             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
386             shardManager.tell(new ActorInitialized(), mockShardActor);
387
388             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
389
390             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
391         }};
392     }
393
394     @Test
395     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
396         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
397
398         // Create an ActorSystem ShardManager actor for member-1.
399
400         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
401         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
402
403         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
404
405         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
406                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
407                         new MockConfiguration()), shardManagerID);
408
409         // Create an ActorSystem ShardManager actor for member-2.
410
411         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
412
413         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
414
415         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
416
417         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
418                 put("default", Arrays.asList("member-1", "member-2")).
419                 put("astronauts", Arrays.asList("member-2")).build());
420
421         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
422                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
423                         mockConfig2), shardManagerID);
424
425         new JavaTestKit(system1) {{
426
427             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
428             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
429
430             shardManager2.tell(new ActorInitialized(), mockShardActor2);
431
432             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
433             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
434             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
435                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
436             shardManager2.tell(new RoleChangeNotification(memberId2,
437                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
438
439             shardManager1.underlyingActor().waitForMemberUp();
440
441             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
442
443             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
444             String path = found.getPrimaryPath();
445             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
446             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
447
448             shardManager2.underlyingActor().verifyFindPrimary();
449
450             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
451
452             shardManager1.underlyingActor().waitForMemberRemoved();
453
454             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
455
456             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
457         }};
458
459         JavaTestKit.shutdownActorSystem(system1);
460         JavaTestKit.shutdownActorSystem(system2);
461     }
462
463     @Test
464     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
465         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
466
467         // Create an ActorSystem ShardManager actor for member-1.
468
469         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
470         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
471
472         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
473
474         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
475             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
476                 new MockConfiguration()), shardManagerID);
477
478         // Create an ActorSystem ShardManager actor for member-2.
479
480         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
481
482         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
483
484         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
485
486         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
487             put("default", Arrays.asList("member-1", "member-2")).build());
488
489         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
490             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
491                 mockConfig2), shardManagerID);
492
493         new JavaTestKit(system1) {{
494
495             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
496             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
497             shardManager1.tell(new ActorInitialized(), mockShardActor1);
498             shardManager2.tell(new ActorInitialized(), mockShardActor2);
499
500             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
501             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
502             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
503                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
504             shardManager1.tell(new RoleChangeNotification(memberId1,
505                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
506             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
507                     DataStoreVersions.CURRENT_VERSION),
508                 mockShardActor2);
509             shardManager2.tell(new RoleChangeNotification(memberId2,
510                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
511             shardManager1.underlyingActor().waitForMemberUp();
512
513             shardManager1.tell(new FindPrimary("default", true), getRef());
514
515             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
516             String path = found.getPrimaryPath();
517             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
518
519             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
520                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
521
522             shardManager1.underlyingActor().waitForUnreachableMember();
523
524             shardManager1.tell(new FindPrimary("default", true), getRef());
525
526             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
527
528             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
529                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
530
531             shardManager1.underlyingActor().waitForReachableMember();
532
533             shardManager1.tell(new FindPrimary("default", true), getRef());
534
535             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
536             String path1 = found1.getPrimaryPath();
537             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
538
539         }};
540
541         JavaTestKit.shutdownActorSystem(system1);
542         JavaTestKit.shutdownActorSystem(system2);
543     }
544
545     @Test
546     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
547         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
548
549         // Create an ActorSystem ShardManager actor for member-1.
550
551         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
552         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
553
554         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
555
556         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
557             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
558                 new MockConfiguration()), shardManagerID);
559
560         // Create an ActorSystem ShardManager actor for member-2.
561
562         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
563
564         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
565
566         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
567
568         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
569             put("default", Arrays.asList("member-1", "member-2")).build());
570
571         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
572             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
573                 mockConfig2), shardManagerID);
574
575         new JavaTestKit(system1) {{
576
577             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
578             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
579             shardManager1.tell(new ActorInitialized(), mockShardActor1);
580             shardManager2.tell(new ActorInitialized(), mockShardActor2);
581
582             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
583             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
584             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
585                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
586             shardManager1.tell(new RoleChangeNotification(memberId1,
587                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
588             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
589                     DataStoreVersions.CURRENT_VERSION),
590                 mockShardActor2);
591             shardManager2.tell(new RoleChangeNotification(memberId2,
592                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
593             shardManager1.underlyingActor().waitForMemberUp();
594
595             shardManager1.tell(new FindPrimary("default", true), getRef());
596
597             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
598             String path = found.getPrimaryPath();
599             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
600
601             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
602                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
603
604             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
605                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
606
607             shardManager1.underlyingActor().waitForUnreachableMember();
608
609             shardManager1.tell(new FindPrimary("default", true), getRef());
610
611             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
612
613             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
614
615             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
616                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
617             shardManager1.tell(new RoleChangeNotification(memberId1,
618                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
619
620             shardManager1.tell(new FindPrimary("default", true), getRef());
621
622             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
623             String path1 = found1.getPrimaryPath();
624             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
625
626         }};
627
628         JavaTestKit.shutdownActorSystem(system1);
629         JavaTestKit.shutdownActorSystem(system2);
630     }
631
632
633     @Test
634     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
635         new JavaTestKit(getSystem()) {{
636             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
637
638             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
639
640             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
641
642             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
643
644             assertEquals("getShardName", "non-existent", notFound.getShardName());
645         }};
646     }
647
648     @Test
649     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
650         new JavaTestKit(getSystem()) {{
651             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
652
653             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
654             shardManager.tell(new ActorInitialized(), mockShardActor);
655
656             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
657
658             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
659
660             assertTrue("Found path contains " + found.getPath().path().toString(),
661                     found.getPath().path().toString().contains("member-1-shard-default-config"));
662         }};
663     }
664
665     @Test
666     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
667         new JavaTestKit(getSystem()) {{
668             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
669
670             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
671
672             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
673         }};
674     }
675
676     @Test
677     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
678         new JavaTestKit(getSystem()) {{
679             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
680
681             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
682
683             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
684             // delayed until we send ActorInitialized.
685             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
686                     new Timeout(5, TimeUnit.SECONDS));
687
688             shardManager.tell(new ActorInitialized(), mockShardActor);
689
690             Object resp = Await.result(future, duration("5 seconds"));
691             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
692         }};
693     }
694
695     @Test
696     public void testOnRecoveryJournalIsCleaned() {
697         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
698                 ImmutableSet.of("foo")));
699         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
700                 ImmutableSet.of("bar")));
701         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
702
703         new JavaTestKit(getSystem()) {{
704             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
705                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
706
707             shardManager.underlyingActor().waitForRecoveryComplete();
708             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
709
710             // Journal entries up to the last one should've been deleted
711             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
712             synchronized (journal) {
713                 assertEquals("Journal size", 0, journal.size());
714             }
715         }};
716     }
717
718     @Test
719     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
720         new JavaTestKit(getSystem()) {
721             {
722                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
723
724                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
725                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
726                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
727
728                 verify(ready, never()).countDown();
729
730                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
731                         Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
732
733                 verify(ready, times(1)).countDown();
734
735             }};
736     }
737
738     @Test
739     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
740         new JavaTestKit(getSystem()) {
741             {
742                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
743
744                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
745                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
746                         memberId, null, RaftState.Follower.name()));
747
748                 verify(ready, never()).countDown();
749
750                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
751
752                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
753                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
754                         DataStoreVersions.CURRENT_VERSION));
755
756                 verify(ready, times(1)).countDown();
757
758             }};
759     }
760
761     @Test
762     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
763         new JavaTestKit(getSystem()) {
764             {
765                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
766
767                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
768                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
769                         memberId, null, RaftState.Follower.name()));
770
771                 verify(ready, never()).countDown();
772
773                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
774                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
775                         DataStoreVersions.CURRENT_VERSION));
776
777                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
778
779                 verify(ready, times(1)).countDown();
780
781             }};
782     }
783
784     @Test
785     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
786         new JavaTestKit(getSystem()) {
787             {
788                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
789
790                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
791                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
792
793                 verify(ready, never()).countDown();
794
795             }};
796     }
797
798
799     @Test
800     public void testByDefaultSyncStatusIsFalse() throws Exception{
801         final Props persistentProps = newShardMgrProps(true);
802         final TestActorRef<ShardManager> shardManager =
803                 TestActorRef.create(getSystem(), persistentProps);
804
805         ShardManager shardManagerActor = shardManager.underlyingActor();
806
807         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
808     }
809
810     @Test
811     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
812         final Props persistentProps = ShardManager.props(
813                 new MockClusterWrapper(),
814                 new MockConfiguration(),
815                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
816         final TestActorRef<ShardManager> shardManager =
817                 TestActorRef.create(getSystem(), persistentProps);
818
819         ShardManager shardManagerActor = shardManager.underlyingActor();
820         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
821                 RaftState.Follower.name(), RaftState.Leader.name()));
822
823         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
824     }
825
826     @Test
827     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
828         final Props persistentProps = newShardMgrProps(true);
829         final TestActorRef<ShardManager> shardManager =
830                 TestActorRef.create(getSystem(), persistentProps);
831
832         ShardManager shardManagerActor = shardManager.underlyingActor();
833         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
834                 RaftState.Follower.name(), RaftState.Candidate.name()));
835
836         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
837
838         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
839         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
840
841         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
842     }
843
844     @Test
845     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
846         final Props persistentProps = ShardManager.props(
847                 new MockClusterWrapper(),
848                 new MockConfiguration(),
849                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
850         final TestActorRef<ShardManager> shardManager =
851                 TestActorRef.create(getSystem(), persistentProps);
852
853         ShardManager shardManagerActor = shardManager.underlyingActor();
854         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
855                 RaftState.Candidate.name(), RaftState.Follower.name()));
856
857         // Initially will be false
858         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
859
860         // Send status true will make sync status true
861         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
862
863         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
864
865         // Send status false will make sync status false
866         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
867
868         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
869
870     }
871
872     @Test
873     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
874         final Props persistentProps = ShardManager.props(
875                 new MockClusterWrapper(),
876                 new MockConfiguration() {
877                     @Override
878                     public List<String> getMemberShardNames(String memberName) {
879                         return Arrays.asList("default", "astronauts");
880                     }
881                 },
882                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
883         final TestActorRef<ShardManager> shardManager =
884                 TestActorRef.create(getSystem(), persistentProps);
885
886         ShardManager shardManagerActor = shardManager.underlyingActor();
887
888         // Initially will be false
889         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
890
891         // Make default shard leader
892         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
893                 RaftState.Follower.name(), RaftState.Leader.name()));
894
895         // default = Leader, astronauts is unknown so sync status remains false
896         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
897
898         // Make astronauts shard leader as well
899         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
900                 RaftState.Follower.name(), RaftState.Leader.name()));
901
902         // Now sync status should be true
903         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
904
905         // Make astronauts a Follower
906         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
907                 RaftState.Leader.name(), RaftState.Follower.name()));
908
909         // Sync status is not true
910         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
911
912         // Make the astronauts follower sync status true
913         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
914
915         // Sync status is now true
916         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
917
918     }
919
920     @Test
921     public void testOnReceiveSwitchShardBehavior() throws Exception {
922         new JavaTestKit(getSystem()) {{
923             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
924
925             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
926             shardManager.tell(new ActorInitialized(), mockShardActor);
927
928             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
929
930             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
931
932             assertEquals(RaftState.Leader, switchBehavior.getNewState());
933             assertEquals(1000, switchBehavior.getNewTerm());
934         }};
935     }
936
937     public void testOnReceiveCreateShard() {
938         new JavaTestKit(getSystem()) {{
939             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
940
941             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
942
943             SchemaContext schemaContext = TestModel.createTestContext();
944             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
945
946             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
947                     persistent(false).build();
948             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
949
950             shardManager.tell(new CreateShard("foo", Arrays.asList("member-1", "member-5", "member-6"), shardPropsCreator,
951                     datastoreContext), getRef());
952
953             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
954
955             shardManager.tell(new FindLocalShard("foo", true), getRef());
956
957             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
958
959             assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
960             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
961                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
962                     shardPropsCreator.peerAddresses.keySet());
963             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
964                     shardPropsCreator.shardId);
965             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
966
967             // Send CreateShard with same name - should fail.
968
969             shardManager.tell(new CreateShard("foo", Collections.<String>emptyList(), shardPropsCreator, null), getRef());
970
971             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
972         }};
973     }
974
975     @Test
976     public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
977         new JavaTestKit(getSystem()) {{
978             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
979
980             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
981
982             shardManager.tell(new CreateShard("foo", Arrays.asList("member-1"), shardPropsCreator, null), getRef());
983
984             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
985
986             SchemaContext schemaContext = TestModel.createTestContext();
987             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
988
989             shardManager.tell(new FindLocalShard("foo", true), getRef());
990
991             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
992
993             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
994             assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext);
995         }};
996     }
997
998     private static class TestShardPropsCreator implements ShardPropsCreator {
999         ShardIdentifier shardId;
1000         Map<String, String> peerAddresses;
1001         SchemaContext schemaContext;
1002         DatastoreContext datastoreContext;
1003
1004         @Override
1005         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
1006                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
1007             this.shardId = shardId;
1008             this.peerAddresses = peerAddresses;
1009             this.schemaContext = schemaContext;
1010             this.datastoreContext = datastoreContext;
1011             return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
1012         }
1013
1014     }
1015
1016     private static class TestShardManager extends ShardManager {
1017         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1018
1019         TestShardManager(String shardMrgIDSuffix) {
1020             super(new MockClusterWrapper(), new MockConfiguration(),
1021                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1022                     new PrimaryShardInfoFutureCache());
1023         }
1024
1025         @Override
1026         public void handleRecover(Object message) throws Exception {
1027             try {
1028                 super.handleRecover(message);
1029             } finally {
1030                 if(message instanceof RecoveryCompleted) {
1031                     recoveryComplete.countDown();
1032                 }
1033             }
1034         }
1035
1036         void waitForRecoveryComplete() {
1037             assertEquals("Recovery complete", true,
1038                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1039         }
1040     }
1041
1042     @SuppressWarnings("serial")
1043     static class TestShardManagerCreator implements Creator<TestShardManager> {
1044         String shardMrgIDSuffix;
1045
1046         TestShardManagerCreator(String shardMrgIDSuffix) {
1047             this.shardMrgIDSuffix = shardMrgIDSuffix;
1048         }
1049
1050         @Override
1051         public TestShardManager create() throws Exception {
1052             return new TestShardManager(shardMrgIDSuffix);
1053         }
1054
1055     }
1056
1057     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1058         private static final long serialVersionUID = 1L;
1059         private final Creator<ShardManager> delegate;
1060
1061         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1062             this.delegate = delegate;
1063         }
1064
1065         @Override
1066         public ShardManager create() throws Exception {
1067             return delegate.create();
1068         }
1069     }
1070
1071     private static class ForwardingShardManager extends ShardManager {
1072         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1073         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1074         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1075         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1076         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1077         private final ActorRef shardActor;
1078         private final String name;
1079
1080         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1081                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1082                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1083             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1084             this.shardActor = shardActor;
1085             this.name = name;
1086         }
1087
1088         @Override
1089         public void handleCommand(Object message) throws Exception {
1090             try{
1091                 super.handleCommand(message);
1092             } finally {
1093                 if(message instanceof FindPrimary) {
1094                     findPrimaryMessageReceived.countDown();
1095                 } else if(message instanceof ClusterEvent.MemberUp) {
1096                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1097                     if(!getCluster().getCurrentMemberName().equals(role)) {
1098                         memberUpReceived.countDown();
1099                     }
1100                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1101                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1102                     if(!getCluster().getCurrentMemberName().equals(role)) {
1103                         memberRemovedReceived.countDown();
1104                     }
1105                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1106                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1107                     if(!getCluster().getCurrentMemberName().equals(role)) {
1108                         memberUnreachableReceived.countDown();
1109                     }
1110                 } else if(message instanceof ClusterEvent.ReachableMember) {
1111                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1112                     if(!getCluster().getCurrentMemberName().equals(role)) {
1113                         memberReachableReceived.countDown();
1114                     }
1115                 }
1116             }
1117         }
1118
1119         @Override
1120         public String persistenceId() {
1121             return name;
1122         }
1123
1124         @Override
1125         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1126             return shardActor;
1127         }
1128
1129         void waitForMemberUp() {
1130             assertEquals("MemberUp received", true,
1131                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1132             memberUpReceived = new CountDownLatch(1);
1133         }
1134
1135         void waitForMemberRemoved() {
1136             assertEquals("MemberRemoved received", true,
1137                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1138             memberRemovedReceived = new CountDownLatch(1);
1139         }
1140
1141         void waitForUnreachableMember() {
1142             assertEquals("UnreachableMember received", true,
1143                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1144                 ));
1145             memberUnreachableReceived = new CountDownLatch(1);
1146         }
1147
1148         void waitForReachableMember() {
1149             assertEquals("ReachableMember received", true,
1150                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1151             memberReachableReceived = new CountDownLatch(1);
1152         }
1153
1154         void verifyFindPrimary() {
1155             assertEquals("FindPrimary received", true,
1156                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1157             findPrimaryMessageReceived = new CountDownLatch(1);
1158         }
1159     }
1160 }