Merge "Add documentation in akka.conf on how to specify location of persistent data"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.mock;
7 import static org.mockito.Mockito.never;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import static org.mockito.Mockito.when;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.AddressFromURIString;
14 import akka.actor.Props;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterEvent;
17 import akka.dispatch.Dispatchers;
18 import akka.japi.Creator;
19 import akka.pattern.Patterns;
20 import akka.persistence.RecoveryCompleted;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.collect.ImmutableSet;
26 import com.google.common.collect.Sets;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.typesafe.config.ConfigFactory;
29 import java.net.URI;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.mockito.Mock;
42 import org.mockito.MockitoAnnotations;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
46 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
49 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
54 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
55 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
56 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
57 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
58 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
59 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
60 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
61 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
62 import org.opendaylight.controller.cluster.raft.RaftState;
63 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
66 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68 import scala.concurrent.Await;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71
72 public class ShardManagerTest extends AbstractActorTest {
73     private static int ID_COUNTER = 1;
74
75     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
76     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
77
78     @Mock
79     private static CountDownLatch ready;
80
81     private static TestActorRef<MessageCollectorActor> mockShardActor;
82
83     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
84             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
85
86     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
87         String name = new ShardIdentifier(shardName, memberName,"config").toString();
88         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
89     }
90
91     @Before
92     public void setUp() {
93         MockitoAnnotations.initMocks(this);
94
95         InMemoryJournal.clear();
96
97         if(mockShardActor == null) {
98             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
99             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
100         }
101
102         mockShardActor.underlyingActor().clear();
103     }
104
105     @After
106     public void tearDown() {
107         InMemoryJournal.clear();
108     }
109
110     private Props newShardMgrProps() {
111         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
112                 datastoreContextBuilder.build(), ready);
113     }
114
115     private Props newPropsShardMgrWithMockShardActor() {
116         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
117                 new MockConfiguration());
118     }
119
120     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
121             final ClusterWrapper clusterWrapper, final Configuration config) {
122         Creator<ShardManager> creator = new Creator<ShardManager>() {
123             private static final long serialVersionUID = 1L;
124             @Override
125             public ShardManager create() throws Exception {
126                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
127                         ready, name, shardActor);
128             }
129         };
130
131         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
132     }
133
134     @Test
135     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
136         new JavaTestKit(getSystem()) {{
137             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
138
139             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
140
141             shardManager.tell(new FindPrimary("non-existent", false), getRef());
142
143             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
144         }};
145     }
146
147     @Test
148     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
149         new JavaTestKit(getSystem()) {{
150             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
151
152             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
153
154             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
155             shardManager.tell(new ActorInitialized(), mockShardActor);
156
157             shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
158
159             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
160             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
161                     RaftState.Leader.name())), mockShardActor);
162
163             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
164
165             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
166             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
167                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
168         }};
169     }
170
171     @Test
172     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
173         new JavaTestKit(getSystem()) {{
174             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
175
176             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
177             shardManager.tell(new ActorInitialized(), mockShardActor);
178
179             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
180             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
181             shardManager.tell(new RoleChangeNotification(memberId1,
182                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
183             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
184
185             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
186
187             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
188         }};
189     }
190
191     @Test
192     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
193         new JavaTestKit(getSystem()) {{
194             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
195
196             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
197             shardManager.tell(new ActorInitialized(), mockShardActor);
198
199             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
200             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
201
202             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
203             shardManager.tell(new RoleChangeNotification(memberId1,
204                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
205             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
206
207             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
208
209             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
210             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
211                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
212         }};
213     }
214
215     @Test
216     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
217         new JavaTestKit(getSystem()) {{
218             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
219
220             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
221
222             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
223         }};
224     }
225
226     @Test
227     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
228         new JavaTestKit(getSystem()) {{
229             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
230
231             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
232             shardManager.tell(new ActorInitialized(), mockShardActor);
233
234             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
235
236             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
237         }};
238     }
239
240     @Test
241     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
242         new JavaTestKit(getSystem()) {{
243             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
244
245             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
246             shardManager.tell(new ActorInitialized(), mockShardActor);
247
248             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
249             shardManager.tell(new RoleChangeNotification(memberId,
250                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
251
252             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
253
254             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
255
256             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
257
258             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
259
260             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
261             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
262                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
263         }};
264     }
265
266     @Test
267     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
268         new JavaTestKit(getSystem()) {{
269             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
270
271             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
272
273             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
274             // delayed until we send ActorInitialized and RoleChangeNotification.
275             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
276
277             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
278
279             shardManager.tell(new ActorInitialized(), mockShardActor);
280
281             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
282
283             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
284             shardManager.tell(new RoleChangeNotification(memberId,
285                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
286
287             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
288
289             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
290
291             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
292             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
293                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
294
295             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
296         }};
297     }
298
299     @Test
300     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
301         new JavaTestKit(getSystem()) {{
302             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
303
304             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
305
306             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
307
308             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
309
310             shardManager.tell(new ActorInitialized(), mockShardActor);
311
312             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
313         }};
314     }
315
316     @Test
317     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
318         new JavaTestKit(getSystem()) {{
319             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
320
321             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
322             shardManager.tell(new ActorInitialized(), mockShardActor);
323             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
324                     null, RaftState.Candidate.name()), mockShardActor);
325
326             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
327
328             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
329         }};
330     }
331
332     @Test
333     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
334         new JavaTestKit(getSystem()) {{
335             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
336
337             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
338             shardManager.tell(new ActorInitialized(), mockShardActor);
339
340             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
341
342             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
343         }};
344     }
345
346     @Test
347     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
348         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
349
350         // Create an ActorSystem ShardManager actor for member-1.
351
352         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
353         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
354
355         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
356
357         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
358                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
359                         new MockConfiguration()), shardManagerID);
360
361         // Create an ActorSystem ShardManager actor for member-2.
362
363         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
364
365         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
366
367         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
368
369         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
370                 put("default", Arrays.asList("member-1", "member-2")).
371                 put("astronauts", Arrays.asList("member-2")).build());
372
373         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
374                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
375                         mockConfig2), shardManagerID);
376
377         new JavaTestKit(system1) {{
378
379             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
380             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
381
382             shardManager2.tell(new ActorInitialized(), mockShardActor2);
383
384             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
385             shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
386             shardManager2.tell(new RoleChangeNotification(memberId2,
387                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
388
389             shardManager1.underlyingActor().waitForMemberUp();
390
391             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
392
393             PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
394             String path = found.getPrimaryPath();
395             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
396
397             shardManager2.underlyingActor().verifyFindPrimary();
398
399             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
400
401             shardManager1.underlyingActor().waitForMemberRemoved();
402
403             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
404
405             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
406         }};
407
408         JavaTestKit.shutdownActorSystem(system1);
409         JavaTestKit.shutdownActorSystem(system2);
410     }
411
412     @Test
413     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
414         new JavaTestKit(getSystem()) {{
415             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
416
417             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
418
419             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
420
421             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
422
423             assertEquals("getShardName", "non-existent", notFound.getShardName());
424         }};
425     }
426
427     @Test
428     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
429         new JavaTestKit(getSystem()) {{
430             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
431
432             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
433             shardManager.tell(new ActorInitialized(), mockShardActor);
434
435             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
436
437             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
438
439             assertTrue("Found path contains " + found.getPath().path().toString(),
440                     found.getPath().path().toString().contains("member-1-shard-default-config"));
441         }};
442     }
443
444     @Test
445     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
446         new JavaTestKit(getSystem()) {{
447             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
448
449             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
450
451             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
452         }};
453     }
454
455     @Test
456     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
457         new JavaTestKit(getSystem()) {{
458             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
459
460             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
461
462             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
463             // delayed until we send ActorInitialized.
464             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
465                     new Timeout(5, TimeUnit.SECONDS));
466
467             shardManager.tell(new ActorInitialized(), mockShardActor);
468
469             Object resp = Await.result(future, duration("5 seconds"));
470             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
471         }};
472     }
473
474     @Test
475     public void testOnRecoveryJournalIsCleaned() {
476         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
477                 ImmutableSet.of("foo")));
478         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
479                 ImmutableSet.of("bar")));
480         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
481
482         new JavaTestKit(getSystem()) {{
483             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
484                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
485
486             shardManager.underlyingActor().waitForRecoveryComplete();
487             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
488
489             // Journal entries up to the last one should've been deleted
490             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
491             synchronized (journal) {
492                 assertEquals("Journal size", 1, journal.size());
493                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
494             }
495         }};
496     }
497
498     @Test
499     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
500         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
501         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
502                 persistedModules));
503         new JavaTestKit(getSystem()) {{
504             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
505                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
506
507             shardManager.underlyingActor().waitForRecoveryComplete();
508
509             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
510
511             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
512         }};
513     }
514
515     @Test
516     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
517             throws Exception {
518         new JavaTestKit(getSystem()) {{
519             final TestActorRef<ShardManager> shardManager =
520                     TestActorRef.create(getSystem(), newShardMgrProps());
521
522             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
523
524             ModuleIdentifier foo = mock(ModuleIdentifier.class);
525             when(foo.getNamespace()).thenReturn(new URI("foo"));
526
527             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
528             moduleIdentifierSet.add(foo);
529
530             SchemaContext schemaContext = mock(SchemaContext.class);
531             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
532
533             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
534
535             assertEquals("getKnownModules", Sets.newHashSet("foo"),
536                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
537
538             ModuleIdentifier bar = mock(ModuleIdentifier.class);
539             when(bar.getNamespace()).thenReturn(new URI("bar"));
540
541             moduleIdentifierSet.add(bar);
542
543             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
544
545             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
546                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
547         }};
548     }
549
550     @Test
551     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
552             throws Exception {
553         new JavaTestKit(getSystem()) {{
554             final TestActorRef<ShardManager> shardManager =
555                     TestActorRef.create(getSystem(), newShardMgrProps());
556
557             SchemaContext schemaContext = mock(SchemaContext.class);
558             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
559
560             ModuleIdentifier foo = mock(ModuleIdentifier.class);
561             when(foo.getNamespace()).thenReturn(new URI("foo"));
562
563             moduleIdentifierSet.add(foo);
564
565             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
566
567             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
568
569             assertEquals("getKnownModules", Sets.newHashSet("foo"),
570                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
571
572             //Create a completely different SchemaContext with only the bar module in it
573             //schemaContext = mock(SchemaContext.class);
574             moduleIdentifierSet.clear();
575             ModuleIdentifier bar = mock(ModuleIdentifier.class);
576             when(bar.getNamespace()).thenReturn(new URI("bar"));
577
578             moduleIdentifierSet.add(bar);
579
580             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
581
582             assertEquals("getKnownModules", Sets.newHashSet("foo"),
583                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
584
585         }};
586     }
587
588     @Test
589     public void testRecoveryApplicable(){
590         new JavaTestKit(getSystem()) {
591             {
592                 final Props persistentProps = ShardManager.props(
593                         new MockClusterWrapper(),
594                         new MockConfiguration(),
595                         DatastoreContext.newBuilder().persistent(true).build(), ready);
596                 final TestActorRef<ShardManager> persistentShardManager =
597                         TestActorRef.create(getSystem(), persistentProps);
598
599                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
600
601                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
602
603                 final Props nonPersistentProps = ShardManager.props(
604                         new MockClusterWrapper(),
605                         new MockConfiguration(),
606                         DatastoreContext.newBuilder().persistent(false).build(), ready);
607                 final TestActorRef<ShardManager> nonPersistentShardManager =
608                         TestActorRef.create(getSystem(), nonPersistentProps);
609
610                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
611
612                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
613
614
615             }};
616
617     }
618
619     @Test
620     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
621             throws Exception {
622         final CountDownLatch persistLatch = new CountDownLatch(1);
623         final Creator<ShardManager> creator = new Creator<ShardManager>() {
624             private static final long serialVersionUID = 1L;
625             @Override
626             public ShardManager create() throws Exception {
627                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
628                     @Override
629                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
630                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
631                                 = new DataPersistenceProviderMonitor();
632                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
633                         return dataPersistenceProviderMonitor;
634                     }
635                 };
636             }
637         };
638
639         new JavaTestKit(getSystem()) {{
640
641             final TestActorRef<ShardManager> shardManager =
642                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
643
644             ModuleIdentifier foo = mock(ModuleIdentifier.class);
645             when(foo.getNamespace()).thenReturn(new URI("foo"));
646
647             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
648             moduleIdentifierSet.add(foo);
649
650             SchemaContext schemaContext = mock(SchemaContext.class);
651             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
652
653             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
654
655             assertEquals("Persisted", true,
656                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
657
658         }};
659     }
660
661     @Test
662     public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
663         new JavaTestKit(getSystem()) {
664             {
665                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
666
667                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
668                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
669                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
670
671                 verify(ready, never()).countDown();
672
673                 shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
674
675                 verify(ready, times(1)).countDown();
676
677             }};
678     }
679
680     @Test
681     public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
682         new JavaTestKit(getSystem()) {
683             {
684                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
685
686                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
687                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
688                         memberId, null, RaftState.Follower.name()));
689
690                 verify(ready, never()).countDown();
691
692                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
693
694                 shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
695
696                 verify(ready, times(1)).countDown();
697
698             }};
699     }
700
701     @Test
702     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
703         new JavaTestKit(getSystem()) {
704             {
705                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
706
707                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
708                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
709                         memberId, null, RaftState.Follower.name()));
710
711                 verify(ready, never()).countDown();
712
713                 shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
714
715                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
716
717                 verify(ready, times(1)).countDown();
718
719             }};
720     }
721
722     @Test
723     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
724         new JavaTestKit(getSystem()) {
725             {
726                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
727
728                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
729                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
730
731                 verify(ready, never()).countDown();
732
733             }};
734     }
735
736
737     @Test
738     public void testByDefaultSyncStatusIsFalse() throws Exception{
739         final Props persistentProps = ShardManager.props(
740                 new MockClusterWrapper(),
741                 new MockConfiguration(),
742                 DatastoreContext.newBuilder().persistent(true).build(), ready);
743         final TestActorRef<ShardManager> shardManager =
744                 TestActorRef.create(getSystem(), persistentProps);
745
746         ShardManager shardManagerActor = shardManager.underlyingActor();
747
748         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
749     }
750
751     @Test
752     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
753         final Props persistentProps = ShardManager.props(
754                 new MockClusterWrapper(),
755                 new MockConfiguration(),
756                 DatastoreContext.newBuilder().persistent(true).build(), ready);
757         final TestActorRef<ShardManager> shardManager =
758                 TestActorRef.create(getSystem(), persistentProps);
759
760         ShardManager shardManagerActor = shardManager.underlyingActor();
761         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
762                 RaftState.Follower.name(), RaftState.Leader.name()));
763
764         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
765     }
766
767     @Test
768     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
769         final Props persistentProps = ShardManager.props(
770                 new MockClusterWrapper(),
771                 new MockConfiguration(),
772                 DatastoreContext.newBuilder().persistent(true).build(), ready);
773         final TestActorRef<ShardManager> shardManager =
774                 TestActorRef.create(getSystem(), persistentProps);
775
776         ShardManager shardManagerActor = shardManager.underlyingActor();
777         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
778                 RaftState.Follower.name(), RaftState.Candidate.name()));
779
780         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
781
782         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
783         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
784
785         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
786     }
787
788     @Test
789     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
790         final Props persistentProps = ShardManager.props(
791                 new MockClusterWrapper(),
792                 new MockConfiguration(),
793                 DatastoreContext.newBuilder().persistent(true).build(), ready);
794         final TestActorRef<ShardManager> shardManager =
795                 TestActorRef.create(getSystem(), persistentProps);
796
797         ShardManager shardManagerActor = shardManager.underlyingActor();
798         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
799                 RaftState.Candidate.name(), RaftState.Follower.name()));
800
801         // Initially will be false
802         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
803
804         // Send status true will make sync status true
805         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
806
807         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
808
809         // Send status false will make sync status false
810         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
811
812         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
813
814     }
815
816     @Test
817     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
818         final Props persistentProps = ShardManager.props(
819                 new MockClusterWrapper(),
820                 new MockConfiguration() {
821                     @Override
822                     public List<String> getMemberShardNames(String memberName) {
823                         return Arrays.asList("default", "astronauts");
824                     }
825                 },
826                 DatastoreContext.newBuilder().persistent(true).build(), ready);
827         final TestActorRef<ShardManager> shardManager =
828                 TestActorRef.create(getSystem(), persistentProps);
829
830         ShardManager shardManagerActor = shardManager.underlyingActor();
831
832         // Initially will be false
833         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
834
835         // Make default shard leader
836         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
837                 RaftState.Follower.name(), RaftState.Leader.name()));
838
839         // default = Leader, astronauts is unknown so sync status remains false
840         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
841
842         // Make astronauts shard leader as well
843         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
844                 RaftState.Follower.name(), RaftState.Leader.name()));
845
846         // Now sync status should be true
847         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
848
849         // Make astronauts a Follower
850         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
851                 RaftState.Leader.name(), RaftState.Follower.name()));
852
853         // Sync status is not true
854         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
855
856         // Make the astronauts follower sync status true
857         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
858
859         // Sync status is now true
860         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
861
862     }
863
864     private static class TestShardManager extends ShardManager {
865         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
866
867         TestShardManager(String shardMrgIDSuffix) {
868             super(new MockClusterWrapper(), new MockConfiguration(),
869                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
870         }
871
872         @Override
873         public void handleRecover(Object message) throws Exception {
874             try {
875                 super.handleRecover(message);
876             } finally {
877                 if(message instanceof RecoveryCompleted) {
878                     recoveryComplete.countDown();
879                 }
880             }
881         }
882
883         void waitForRecoveryComplete() {
884             assertEquals("Recovery complete", true,
885                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
886         }
887     }
888
889     @SuppressWarnings("serial")
890     static class TestShardManagerCreator implements Creator<TestShardManager> {
891         String shardMrgIDSuffix;
892
893         TestShardManagerCreator(String shardMrgIDSuffix) {
894             this.shardMrgIDSuffix = shardMrgIDSuffix;
895         }
896
897         @Override
898         public TestShardManager create() throws Exception {
899             return new TestShardManager(shardMrgIDSuffix);
900         }
901
902     }
903
904     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
905         private static final long serialVersionUID = 1L;
906         private final Creator<ShardManager> delegate;
907
908         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
909             this.delegate = delegate;
910         }
911
912         @Override
913         public ShardManager create() throws Exception {
914             return delegate.create();
915         }
916     }
917
918     private static class ForwardingShardManager extends ShardManager {
919         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
920         private CountDownLatch memberUpReceived = new CountDownLatch(1);
921         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
922         private final ActorRef shardActor;
923         private final String name;
924
925         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
926                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
927                 ActorRef shardActor) {
928             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
929             this.shardActor = shardActor;
930             this.name = name;
931         }
932
933         @Override
934         public void handleCommand(Object message) throws Exception {
935             try{
936                 super.handleCommand(message);
937             } finally {
938                 if(message instanceof FindPrimary) {
939                     findPrimaryMessageReceived.countDown();
940                 } else if(message instanceof ClusterEvent.MemberUp) {
941                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
942                     if(!getCluster().getCurrentMemberName().equals(role)) {
943                         memberUpReceived.countDown();
944                     }
945                 } else if(message instanceof ClusterEvent.MemberRemoved) {
946                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
947                     if(!getCluster().getCurrentMemberName().equals(role)) {
948                         memberRemovedReceived.countDown();
949                     }
950                 }
951             }
952         }
953
954         @Override
955         public String persistenceId() {
956             return name;
957         }
958
959         @Override
960         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
961             return shardActor;
962         }
963
964         void waitForMemberUp() {
965             assertEquals("MemberUp received", true,
966                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
967             memberUpReceived = new CountDownLatch(1);
968         }
969
970         void waitForMemberRemoved() {
971             assertEquals("MemberRemoved received", true,
972                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
973             memberRemovedReceived = new CountDownLatch(1);
974         }
975
976         void verifyFindPrimary() {
977             assertEquals("FindPrimary received", true,
978                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
979             findPrimaryMessageReceived = new CountDownLatch(1);
980         }
981     }
982 }