33d5a2ed981efbf4fb3260440866e339fbf9c949
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNull;
6 import static org.junit.Assert.assertSame;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.mock;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.times;
11 import static org.mockito.Mockito.verify;
12 import static org.mockito.Mockito.when;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.AddressFromURIString;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent;
19 import akka.dispatch.Dispatchers;
20 import akka.japi.Creator;
21 import akka.pattern.Patterns;
22 import akka.persistence.RecoveryCompleted;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import akka.util.Timeout;
26 import com.google.common.base.Optional;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.collect.ImmutableSet;
29 import com.google.common.collect.Sets;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import com.typesafe.config.ConfigFactory;
32 import java.net.URI;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.HashSet;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Set;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import org.junit.After;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.mockito.Mock;
45 import org.mockito.MockitoAnnotations;
46 import org.opendaylight.controller.cluster.DataPersistenceProvider;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
52 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
53 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
54 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
57 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
58 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
59 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
60 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
61 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
62 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
63 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
64 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
65 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
66 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
67 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
68 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
69 import org.opendaylight.controller.cluster.raft.RaftState;
70 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
74 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
75 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
76 import scala.concurrent.Await;
77 import scala.concurrent.Future;
78 import scala.concurrent.duration.FiniteDuration;
79
80 public class ShardManagerTest extends AbstractActorTest {
81     private static int ID_COUNTER = 1;
82
83     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
84     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
85
86     @Mock
87     private static CountDownLatch ready;
88
89     private static TestActorRef<MessageCollectorActor> mockShardActor;
90
91     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
92             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
93
94     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
95         String name = new ShardIdentifier(shardName, memberName,"config").toString();
96         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
97     }
98
99     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
100
101     @Before
102     public void setUp() {
103         MockitoAnnotations.initMocks(this);
104
105         InMemoryJournal.clear();
106
107         if(mockShardActor == null) {
108             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
109             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
110         }
111
112         mockShardActor.underlyingActor().clear();
113     }
114
115     @After
116     public void tearDown() {
117         InMemoryJournal.clear();
118     }
119
120     private Props newShardMgrProps(boolean persistent) {
121         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
122                 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
123     }
124
125     private Props newPropsShardMgrWithMockShardActor() {
126         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
127                 new MockConfiguration());
128     }
129
130     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
131             final ClusterWrapper clusterWrapper, final Configuration config) {
132         Creator<ShardManager> creator = new Creator<ShardManager>() {
133             private static final long serialVersionUID = 1L;
134             @Override
135             public ShardManager create() throws Exception {
136                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
137                         ready, name, shardActor, primaryShardInfoCache);
138             }
139         };
140
141         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
142     }
143
144     @Test
145     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
146         new JavaTestKit(getSystem()) {{
147             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
148
149             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
150
151             shardManager.tell(new FindPrimary("non-existent", false), getRef());
152
153             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
154         }};
155     }
156
157     @Test
158     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
159         new JavaTestKit(getSystem()) {{
160             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
161
162             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
163
164             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
165             shardManager.tell(new ActorInitialized(), mockShardActor);
166
167             DataTree mockDataTree = mock(DataTree.class);
168             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef());
169
170             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
171             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
172                     RaftState.Leader.name())), mockShardActor);
173
174             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
175
176             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
177             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
178                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
179             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
180         }};
181     }
182
183     @Test
184     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
185         new JavaTestKit(getSystem()) {{
186             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
187
188             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
189             shardManager.tell(new ActorInitialized(), mockShardActor);
190
191             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
192             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
193             shardManager.tell(new RoleChangeNotification(memberId1,
194                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
195             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
196
197             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
198
199             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
200         }};
201     }
202
203     @Test
204     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
205         new JavaTestKit(getSystem()) {{
206             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
207
208             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
209             shardManager.tell(new ActorInitialized(), mockShardActor);
210
211             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
212             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
213
214             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
215             shardManager.tell(new RoleChangeNotification(memberId1,
216                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
217             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent()), mockShardActor);
218
219             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
220
221             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
222             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
223                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
224         }};
225     }
226
227     @Test
228     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
229         new JavaTestKit(getSystem()) {{
230             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
231
232             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
233
234             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
235         }};
236     }
237
238     @Test
239     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
240         new JavaTestKit(getSystem()) {{
241             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
242
243             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
244             shardManager.tell(new ActorInitialized(), mockShardActor);
245
246             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
247
248             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
249         }};
250     }
251
252     @Test
253     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
254         new JavaTestKit(getSystem()) {{
255             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
256
257             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
258             shardManager.tell(new ActorInitialized(), mockShardActor);
259
260             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
261             shardManager.tell(new RoleChangeNotification(memberId,
262                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
263
264             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
265
266             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
267
268             DataTree mockDataTree = mock(DataTree.class);
269             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
270
271             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
272
273             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
274             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
275                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
276             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
277         }};
278     }
279
280     @Test
281     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
282         new JavaTestKit(getSystem()) {{
283             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
284
285             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
286
287             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
288             // delayed until we send ActorInitialized and RoleChangeNotification.
289             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
290
291             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
292
293             shardManager.tell(new ActorInitialized(), mockShardActor);
294
295             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
296
297             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
298             shardManager.tell(new RoleChangeNotification(memberId,
299                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
300
301             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
302
303             DataTree mockDataTree = mock(DataTree.class);
304             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
305
306             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
307             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
308                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
309             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
310
311             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
312         }};
313     }
314
315     @Test
316     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
317         new JavaTestKit(getSystem()) {{
318             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
319
320             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
321
322             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
323
324             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
325
326             shardManager.tell(new ActorInitialized(), mockShardActor);
327
328             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
329         }};
330     }
331
332     @Test
333     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
334         new JavaTestKit(getSystem()) {{
335             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
336
337             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
338             shardManager.tell(new ActorInitialized(), mockShardActor);
339             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
340                     null, RaftState.Candidate.name()), mockShardActor);
341
342             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
343
344             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
345         }};
346     }
347
348     @Test
349     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
350         new JavaTestKit(getSystem()) {{
351             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
352
353             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
354             shardManager.tell(new ActorInitialized(), mockShardActor);
355
356             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
357
358             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
359         }};
360     }
361
362     @Test
363     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
364         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
365
366         // Create an ActorSystem ShardManager actor for member-1.
367
368         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
369         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
370
371         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
372
373         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
374                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
375                         new MockConfiguration()), shardManagerID);
376
377         // Create an ActorSystem ShardManager actor for member-2.
378
379         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
380
381         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
382
383         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
384
385         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
386                 put("default", Arrays.asList("member-1", "member-2")).
387                 put("astronauts", Arrays.asList("member-2")).build());
388
389         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
390                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
391                         mockConfig2), shardManagerID);
392
393         new JavaTestKit(system1) {{
394
395             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
396             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
397
398             shardManager2.tell(new ActorInitialized(), mockShardActor2);
399
400             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
401             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
402                     Optional.of(mock(DataTree.class))), mockShardActor2);
403             shardManager2.tell(new RoleChangeNotification(memberId2,
404                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
405
406             shardManager1.underlyingActor().waitForMemberUp();
407
408             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
409
410             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
411             String path = found.getPrimaryPath();
412             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
413
414             shardManager2.underlyingActor().verifyFindPrimary();
415
416             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
417
418             shardManager1.underlyingActor().waitForMemberRemoved();
419
420             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
421
422             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
423         }};
424
425         JavaTestKit.shutdownActorSystem(system1);
426         JavaTestKit.shutdownActorSystem(system2);
427     }
428
429     @Test
430     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
431         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
432
433         // Create an ActorSystem ShardManager actor for member-1.
434
435         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
436         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
437
438         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
439
440         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
441             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
442                 new MockConfiguration()), shardManagerID);
443
444         // Create an ActorSystem ShardManager actor for member-2.
445
446         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
447
448         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
449
450         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
451
452         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
453             put("default", Arrays.asList("member-1", "member-2")).build());
454
455         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
456             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
457                 mockConfig2), shardManagerID);
458
459         new JavaTestKit(system1) {{
460
461             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
462             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
463             shardManager1.tell(new ActorInitialized(), mockShardActor1);
464             shardManager2.tell(new ActorInitialized(), mockShardActor2);
465
466             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
467             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
468             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
469                 Optional.of(mock(DataTree.class))), mockShardActor1);
470             shardManager1.tell(new RoleChangeNotification(memberId1,
471                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
472             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
473                 mockShardActor2);
474             shardManager2.tell(new RoleChangeNotification(memberId2,
475                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
476             shardManager1.underlyingActor().waitForMemberUp();
477
478             shardManager1.tell(new FindPrimary("default", true), getRef());
479
480             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
481             String path = found.getPrimaryPath();
482             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
483
484             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
485                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
486
487             shardManager1.underlyingActor().waitForUnreachableMember();
488
489             shardManager1.tell(new FindPrimary("default", true), getRef());
490
491             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
492
493             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
494                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
495
496             shardManager1.underlyingActor().waitForReachableMember();
497
498             shardManager1.tell(new FindPrimary("default", true), getRef());
499
500             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
501             String path1 = found1.getPrimaryPath();
502             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
503
504         }};
505
506         JavaTestKit.shutdownActorSystem(system1);
507         JavaTestKit.shutdownActorSystem(system2);
508     }
509
510     @Test
511     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
512         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
513
514         // Create an ActorSystem ShardManager actor for member-1.
515
516         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
517         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
518
519         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
520
521         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
522             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
523                 new MockConfiguration()), shardManagerID);
524
525         // Create an ActorSystem ShardManager actor for member-2.
526
527         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
528
529         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
530
531         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
532
533         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
534             put("default", Arrays.asList("member-1", "member-2")).build());
535
536         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
537             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
538                 mockConfig2), shardManagerID);
539
540         new JavaTestKit(system1) {{
541
542             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
543             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
544             shardManager1.tell(new ActorInitialized(), mockShardActor1);
545             shardManager2.tell(new ActorInitialized(), mockShardActor2);
546
547             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
548             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
549             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
550                 Optional.of(mock(DataTree.class))), mockShardActor1);
551             shardManager1.tell(new RoleChangeNotification(memberId1,
552                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
553             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
554                 mockShardActor2);
555             shardManager2.tell(new RoleChangeNotification(memberId2,
556                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
557             shardManager1.underlyingActor().waitForMemberUp();
558
559             shardManager1.tell(new FindPrimary("default", true), getRef());
560
561             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
562             String path = found.getPrimaryPath();
563             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
564
565             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
566                     mockShardActor1.path()), Optional.<DataTree>absent()));
567
568             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
569                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
570
571             shardManager1.underlyingActor().waitForUnreachableMember();
572
573             shardManager1.tell(new FindPrimary("default", true), getRef());
574
575             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
576
577             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
578
579             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))),
580                 mockShardActor1);
581             shardManager1.tell(new RoleChangeNotification(memberId1,
582                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
583
584             shardManager1.tell(new FindPrimary("default", true), getRef());
585
586             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
587             String path1 = found1.getPrimaryPath();
588             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
589
590         }};
591
592         JavaTestKit.shutdownActorSystem(system1);
593         JavaTestKit.shutdownActorSystem(system2);
594     }
595
596
597     @Test
598     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
599         new JavaTestKit(getSystem()) {{
600             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
601
602             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
603
604             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
605
606             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
607
608             assertEquals("getShardName", "non-existent", notFound.getShardName());
609         }};
610     }
611
612     @Test
613     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
614         new JavaTestKit(getSystem()) {{
615             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
616
617             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
618             shardManager.tell(new ActorInitialized(), mockShardActor);
619
620             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
621
622             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
623
624             assertTrue("Found path contains " + found.getPath().path().toString(),
625                     found.getPath().path().toString().contains("member-1-shard-default-config"));
626         }};
627     }
628
629     @Test
630     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
631         new JavaTestKit(getSystem()) {{
632             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
633
634             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
635
636             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
637         }};
638     }
639
640     @Test
641     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
642         new JavaTestKit(getSystem()) {{
643             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
644
645             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
646
647             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
648             // delayed until we send ActorInitialized.
649             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
650                     new Timeout(5, TimeUnit.SECONDS));
651
652             shardManager.tell(new ActorInitialized(), mockShardActor);
653
654             Object resp = Await.result(future, duration("5 seconds"));
655             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
656         }};
657     }
658
659     @Test
660     public void testOnRecoveryJournalIsCleaned() {
661         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
662                 ImmutableSet.of("foo")));
663         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
664                 ImmutableSet.of("bar")));
665         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
666
667         new JavaTestKit(getSystem()) {{
668             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
669                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
670
671             shardManager.underlyingActor().waitForRecoveryComplete();
672             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
673
674             // Journal entries up to the last one should've been deleted
675             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
676             synchronized (journal) {
677                 assertEquals("Journal size", 1, journal.size());
678                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
679             }
680         }};
681     }
682
683     @Test
684     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
685         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
686         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
687                 persistedModules));
688         new JavaTestKit(getSystem()) {{
689             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
690                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
691
692             shardManager.underlyingActor().waitForRecoveryComplete();
693
694             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
695
696             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
697         }};
698     }
699
700     @Test
701     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
702             throws Exception {
703         new JavaTestKit(getSystem()) {{
704             final TestActorRef<ShardManager> shardManager =
705                     TestActorRef.create(getSystem(), newShardMgrProps(true));
706
707             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
708
709             ModuleIdentifier foo = mock(ModuleIdentifier.class);
710             when(foo.getNamespace()).thenReturn(new URI("foo"));
711
712             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
713             moduleIdentifierSet.add(foo);
714
715             SchemaContext schemaContext = mock(SchemaContext.class);
716             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
717
718             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
719
720             assertEquals("getKnownModules", Sets.newHashSet("foo"),
721                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
722
723             ModuleIdentifier bar = mock(ModuleIdentifier.class);
724             when(bar.getNamespace()).thenReturn(new URI("bar"));
725
726             moduleIdentifierSet.add(bar);
727
728             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
729
730             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
731                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
732         }};
733     }
734
735     @Test
736     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
737             throws Exception {
738         new JavaTestKit(getSystem()) {{
739             final TestActorRef<ShardManager> shardManager =
740                     TestActorRef.create(getSystem(), newShardMgrProps(true));
741
742             SchemaContext schemaContext = mock(SchemaContext.class);
743             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
744
745             ModuleIdentifier foo = mock(ModuleIdentifier.class);
746             when(foo.getNamespace()).thenReturn(new URI("foo"));
747
748             moduleIdentifierSet.add(foo);
749
750             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
751
752             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
753
754             assertEquals("getKnownModules", Sets.newHashSet("foo"),
755                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
756
757             //Create a completely different SchemaContext with only the bar module in it
758             //schemaContext = mock(SchemaContext.class);
759             moduleIdentifierSet.clear();
760             ModuleIdentifier bar = mock(ModuleIdentifier.class);
761             when(bar.getNamespace()).thenReturn(new URI("bar"));
762
763             moduleIdentifierSet.add(bar);
764
765             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
766
767             assertEquals("getKnownModules", Sets.newHashSet("foo"),
768                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
769
770         }};
771     }
772
773     @Test
774     public void testRecoveryApplicable(){
775         new JavaTestKit(getSystem()) {
776             {
777                 final Props persistentProps = newShardMgrProps(true);
778                 final TestActorRef<ShardManager> persistentShardManager =
779                         TestActorRef.create(getSystem(), persistentProps);
780
781                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
782
783                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
784
785                 final Props nonPersistentProps = newShardMgrProps(false);
786                 final TestActorRef<ShardManager> nonPersistentShardManager =
787                         TestActorRef.create(getSystem(), nonPersistentProps);
788
789                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
790
791                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
792
793
794             }};
795
796     }
797
798     @Test
799     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
800             throws Exception {
801         final CountDownLatch persistLatch = new CountDownLatch(1);
802         final Creator<ShardManager> creator = new Creator<ShardManager>() {
803             private static final long serialVersionUID = 1L;
804             @Override
805             public ShardManager create() throws Exception {
806                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
807                         ready, new PrimaryShardInfoFutureCache()) {
808                     @Override
809                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
810                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
811                                 = new DataPersistenceProviderMonitor();
812                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
813                         return dataPersistenceProviderMonitor;
814                     }
815                 };
816             }
817         };
818
819         new JavaTestKit(getSystem()) {{
820
821             final TestActorRef<ShardManager> shardManager =
822                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
823
824             ModuleIdentifier foo = mock(ModuleIdentifier.class);
825             when(foo.getNamespace()).thenReturn(new URI("foo"));
826
827             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
828             moduleIdentifierSet.add(foo);
829
830             SchemaContext schemaContext = mock(SchemaContext.class);
831             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
832
833             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
834
835             assertEquals("Persisted", true,
836                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
837
838         }};
839     }
840
841     @Test
842     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
843         new JavaTestKit(getSystem()) {
844             {
845                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
846
847                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
848                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
849                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
850
851                 verify(ready, never()).countDown();
852
853                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
854                         Optional.of(mock(DataTree.class))));
855
856                 verify(ready, times(1)).countDown();
857
858             }};
859     }
860
861     @Test
862     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
863         new JavaTestKit(getSystem()) {
864             {
865                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
866
867                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
868                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
869                         memberId, null, RaftState.Follower.name()));
870
871                 verify(ready, never()).countDown();
872
873                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
874
875                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
876                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
877
878                 verify(ready, times(1)).countDown();
879
880             }};
881     }
882
883     @Test
884     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
885         new JavaTestKit(getSystem()) {
886             {
887                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
888
889                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
890                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
891                         memberId, null, RaftState.Follower.name()));
892
893                 verify(ready, never()).countDown();
894
895                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
896                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
897
898                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
899
900                 verify(ready, times(1)).countDown();
901
902             }};
903     }
904
905     @Test
906     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
907         new JavaTestKit(getSystem()) {
908             {
909                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
910
911                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
912                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
913
914                 verify(ready, never()).countDown();
915
916             }};
917     }
918
919
920     @Test
921     public void testByDefaultSyncStatusIsFalse() throws Exception{
922         final Props persistentProps = newShardMgrProps(true);
923         final TestActorRef<ShardManager> shardManager =
924                 TestActorRef.create(getSystem(), persistentProps);
925
926         ShardManager shardManagerActor = shardManager.underlyingActor();
927
928         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
929     }
930
931     @Test
932     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
933         final Props persistentProps = ShardManager.props(
934                 new MockClusterWrapper(),
935                 new MockConfiguration(),
936                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
937         final TestActorRef<ShardManager> shardManager =
938                 TestActorRef.create(getSystem(), persistentProps);
939
940         ShardManager shardManagerActor = shardManager.underlyingActor();
941         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
942                 RaftState.Follower.name(), RaftState.Leader.name()));
943
944         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
945     }
946
947     @Test
948     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
949         final Props persistentProps = newShardMgrProps(true);
950         final TestActorRef<ShardManager> shardManager =
951                 TestActorRef.create(getSystem(), persistentProps);
952
953         ShardManager shardManagerActor = shardManager.underlyingActor();
954         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
955                 RaftState.Follower.name(), RaftState.Candidate.name()));
956
957         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
958
959         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
960         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
961
962         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
963     }
964
965     @Test
966     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
967         final Props persistentProps = ShardManager.props(
968                 new MockClusterWrapper(),
969                 new MockConfiguration(),
970                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
971         final TestActorRef<ShardManager> shardManager =
972                 TestActorRef.create(getSystem(), persistentProps);
973
974         ShardManager shardManagerActor = shardManager.underlyingActor();
975         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
976                 RaftState.Candidate.name(), RaftState.Follower.name()));
977
978         // Initially will be false
979         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
980
981         // Send status true will make sync status true
982         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
983
984         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
985
986         // Send status false will make sync status false
987         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
988
989         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
990
991     }
992
993     @Test
994     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
995         final Props persistentProps = ShardManager.props(
996                 new MockClusterWrapper(),
997                 new MockConfiguration() {
998                     @Override
999                     public List<String> getMemberShardNames(String memberName) {
1000                         return Arrays.asList("default", "astronauts");
1001                     }
1002                 },
1003                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
1004         final TestActorRef<ShardManager> shardManager =
1005                 TestActorRef.create(getSystem(), persistentProps);
1006
1007         ShardManager shardManagerActor = shardManager.underlyingActor();
1008
1009         // Initially will be false
1010         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
1011
1012         // Make default shard leader
1013         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
1014                 RaftState.Follower.name(), RaftState.Leader.name()));
1015
1016         // default = Leader, astronauts is unknown so sync status remains false
1017         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
1018
1019         // Make astronauts shard leader as well
1020         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
1021                 RaftState.Follower.name(), RaftState.Leader.name()));
1022
1023         // Now sync status should be true
1024         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
1025
1026         // Make astronauts a Follower
1027         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
1028                 RaftState.Leader.name(), RaftState.Follower.name()));
1029
1030         // Sync status is not true
1031         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
1032
1033         // Make the astronauts follower sync status true
1034         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
1035
1036         // Sync status is now true
1037         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
1038
1039     }
1040
1041     private static class TestShardManager extends ShardManager {
1042         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1043
1044         TestShardManager(String shardMrgIDSuffix) {
1045             super(new MockClusterWrapper(), new MockConfiguration(),
1046                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1047                     new PrimaryShardInfoFutureCache());
1048         }
1049
1050         @Override
1051         public void handleRecover(Object message) throws Exception {
1052             try {
1053                 super.handleRecover(message);
1054             } finally {
1055                 if(message instanceof RecoveryCompleted) {
1056                     recoveryComplete.countDown();
1057                 }
1058             }
1059         }
1060
1061         void waitForRecoveryComplete() {
1062             assertEquals("Recovery complete", true,
1063                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1064         }
1065     }
1066
1067     @SuppressWarnings("serial")
1068     static class TestShardManagerCreator implements Creator<TestShardManager> {
1069         String shardMrgIDSuffix;
1070
1071         TestShardManagerCreator(String shardMrgIDSuffix) {
1072             this.shardMrgIDSuffix = shardMrgIDSuffix;
1073         }
1074
1075         @Override
1076         public TestShardManager create() throws Exception {
1077             return new TestShardManager(shardMrgIDSuffix);
1078         }
1079
1080     }
1081
1082     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1083         private static final long serialVersionUID = 1L;
1084         private final Creator<ShardManager> delegate;
1085
1086         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1087             this.delegate = delegate;
1088         }
1089
1090         @Override
1091         public ShardManager create() throws Exception {
1092             return delegate.create();
1093         }
1094     }
1095
1096     private static class ForwardingShardManager extends ShardManager {
1097         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1098         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1099         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1100         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1101         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1102         private final ActorRef shardActor;
1103         private final String name;
1104
1105         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1106                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1107                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1108             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1109             this.shardActor = shardActor;
1110             this.name = name;
1111         }
1112
1113         @Override
1114         public void handleCommand(Object message) throws Exception {
1115             try{
1116                 super.handleCommand(message);
1117             } finally {
1118                 if(message instanceof FindPrimary) {
1119                     findPrimaryMessageReceived.countDown();
1120                 } else if(message instanceof ClusterEvent.MemberUp) {
1121                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1122                     if(!getCluster().getCurrentMemberName().equals(role)) {
1123                         memberUpReceived.countDown();
1124                     }
1125                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1126                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1127                     if(!getCluster().getCurrentMemberName().equals(role)) {
1128                         memberRemovedReceived.countDown();
1129                     }
1130                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1131                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1132                     if(!getCluster().getCurrentMemberName().equals(role)) {
1133                         memberUnreachableReceived.countDown();
1134                     }
1135                 } else if(message instanceof ClusterEvent.ReachableMember) {
1136                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1137                     if(!getCluster().getCurrentMemberName().equals(role)) {
1138                         memberReachableReceived.countDown();
1139                     }
1140                 }
1141             }
1142         }
1143
1144         @Override
1145         public String persistenceId() {
1146             return name;
1147         }
1148
1149         @Override
1150         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1151             return shardActor;
1152         }
1153
1154         void waitForMemberUp() {
1155             assertEquals("MemberUp received", true,
1156                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1157             memberUpReceived = new CountDownLatch(1);
1158         }
1159
1160         void waitForMemberRemoved() {
1161             assertEquals("MemberRemoved received", true,
1162                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1163             memberRemovedReceived = new CountDownLatch(1);
1164         }
1165
1166         void waitForUnreachableMember() {
1167             assertEquals("UnreachableMember received", true,
1168                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1169                 ));
1170             memberUnreachableReceived = new CountDownLatch(1);
1171         }
1172
1173         void waitForReachableMember() {
1174             assertEquals("ReachableMember received", true,
1175                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1176             memberReachableReceived = new CountDownLatch(1);
1177         }
1178
1179         void verifyFindPrimary() {
1180             assertEquals("FindPrimary received", true,
1181                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1182             findPrimaryMessageReceived = new CountDownLatch(1);
1183         }
1184     }
1185 }