Bug 2194: Modify FindPrimary to check for leader
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.mock;
7 import static org.mockito.Mockito.never;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import static org.mockito.Mockito.when;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.pattern.Patterns;
15 import akka.persistence.RecoveryCompleted;
16 import akka.testkit.JavaTestKit;
17 import akka.testkit.TestActorRef;
18 import akka.util.Timeout;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.collect.Sets;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import java.net.URI;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.Mock;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.cluster.DataPersistenceProvider;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
39 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
40 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
48 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
49 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
50 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
51 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
52 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
53 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
54 import org.opendaylight.controller.cluster.raft.RaftState;
55 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
56 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
57 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
58 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
59 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60 import scala.concurrent.Await;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.FiniteDuration;
63
64 public class ShardManagerTest extends AbstractActorTest {
65     private static int ID_COUNTER = 1;
66
67     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
68     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
69
70     @Mock
71     private static CountDownLatch ready;
72
73     private static TestActorRef<MessageCollectorActor> mockShardActor;
74
75     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
76             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
77
78     @Before
79     public void setUp() {
80         MockitoAnnotations.initMocks(this);
81
82         InMemoryJournal.clear();
83
84         if(mockShardActor == null) {
85             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
86             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
87         }
88
89         mockShardActor.underlyingActor().clear();
90     }
91
92     @After
93     public void tearDown() {
94         InMemoryJournal.clear();
95     }
96
97     private Props newShardMgrProps() {
98         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
99                 datastoreContextBuilder.build(), ready);
100     }
101
102     private Props newPropsShardMgrWithMockShardActor() {
103         Creator<ShardManager> creator = new Creator<ShardManager>() {
104             private static final long serialVersionUID = 1L;
105             @Override
106             public ShardManager create() throws Exception {
107                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
108                         datastoreContextBuilder.build(), ready) {
109                     @Override
110                     protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
111                         return mockShardActor;
112                     }
113                 };
114             }
115         };
116
117         return Props.create(new DelegatingShardManagerCreator(creator));
118     }
119
120     @Test
121     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
122         new JavaTestKit(getSystem()) {{
123             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
124
125             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
126
127             shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
128
129             expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
130         }};
131     }
132
133     @Test
134     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
135         new JavaTestKit(getSystem()) {{
136             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
137
138             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
139
140             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
141             shardManager.tell(new ActorInitialized(), mockShardActor);
142
143             shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
144
145             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
146             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
147                     RaftState.Leader.name())), mockShardActor);
148
149             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
150
151             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
152             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
153                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
154         }};
155     }
156
157     @Test
158     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
159         new JavaTestKit(getSystem()) {{
160             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
161
162             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
163             shardManager.tell(new ActorInitialized(), mockShardActor);
164
165             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
166             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
167
168             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
169             shardManager.tell(new RoleChangeNotification(memberId1,
170                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
171             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
172
173             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
174
175             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
176             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
177                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
178         }};
179     }
180
181     @Test
182     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
183         new JavaTestKit(getSystem()) {{
184             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
185
186             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
187
188             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
189         }};
190     }
191
192     @Test
193     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
194         new JavaTestKit(getSystem()) {{
195             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
196
197             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
198             shardManager.tell(new ActorInitialized(), mockShardActor);
199
200             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
201
202             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
203         }};
204     }
205
206     @Test
207     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
208         new JavaTestKit(getSystem()) {{
209             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
210
211             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
212             shardManager.tell(new ActorInitialized(), mockShardActor);
213
214             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
215             shardManager.tell(new RoleChangeNotification(memberId,
216                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
217
218             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
219
220             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
221
222             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
223
224             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
225
226             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
227             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
228                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
229         }};
230     }
231
232     @Test
233     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
234         new JavaTestKit(getSystem()) {{
235             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
236
237             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
238
239             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
240             // delayed until we send ActorInitialized and RoleChangeNotification.
241             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
242
243             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
244
245             shardManager.tell(new ActorInitialized(), mockShardActor);
246
247             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
248
249             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
250             shardManager.tell(new RoleChangeNotification(memberId,
251                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
252
253             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
254
255             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
256
257             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
258             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
259                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
260
261             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
262         }};
263     }
264
265     @Test
266     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
267         new JavaTestKit(getSystem()) {{
268             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
269
270             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
271
272             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
273
274             expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
275
276             shardManager.tell(new ActorInitialized(), mockShardActor);
277
278             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
279         }};
280     }
281
282     @Test
283     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
284         new JavaTestKit(getSystem()) {{
285             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
286
287             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
288             shardManager.tell(new ActorInitialized(), mockShardActor);
289             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
290                     null, RaftState.Candidate.name()), mockShardActor);
291
292             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
293
294             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
295         }};
296     }
297
298     @Test
299     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
300         new JavaTestKit(getSystem()) {{
301             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
302
303             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
304             shardManager.tell(new ActorInitialized(), mockShardActor);
305
306             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
307
308             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
309         }};
310     }
311
312     @Test
313     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
314         new JavaTestKit(getSystem()) {{
315             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
316
317             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
318
319             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
320
321             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
322
323             assertEquals("getShardName", "non-existent", notFound.getShardName());
324         }};
325     }
326
327     @Test
328     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
329         new JavaTestKit(getSystem()) {{
330             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
331
332             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
333             shardManager.tell(new ActorInitialized(), mockShardActor);
334
335             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
336
337             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
338
339             assertTrue("Found path contains " + found.getPath().path().toString(),
340                     found.getPath().path().toString().contains("member-1-shard-default-config"));
341         }};
342     }
343
344     @Test
345     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
346         new JavaTestKit(getSystem()) {{
347             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
348
349             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
350
351             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
352         }};
353     }
354
355     @Test
356     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
357         new JavaTestKit(getSystem()) {{
358             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
359
360             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
361
362             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
363             // delayed until we send ActorInitialized.
364             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
365                     new Timeout(5, TimeUnit.SECONDS));
366
367             shardManager.tell(new ActorInitialized(), mockShardActor);
368
369             Object resp = Await.result(future, duration("5 seconds"));
370             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
371         }};
372     }
373
374     @Test
375     public void testOnReceiveMemberUp() throws Exception {
376         new JavaTestKit(getSystem()) {{
377             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
378
379             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
380
381             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
382
383             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
384                     PrimaryFound.SERIALIZABLE_CLASS));
385             String path = found.getPrimaryPath();
386             assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
387         }};
388     }
389
390     @Test
391     public void testOnReceiveMemberDown() throws Exception {
392
393         new JavaTestKit(getSystem()) {{
394             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
395
396             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
397
398             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
399
400             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
401
402             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
403
404             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
405
406             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
407         }};
408     }
409
410     @Test
411     public void testOnRecoveryJournalIsCleaned() {
412         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
413                 ImmutableSet.of("foo")));
414         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
415                 ImmutableSet.of("bar")));
416         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
417
418         new JavaTestKit(getSystem()) {{
419             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
420                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
421
422             shardManager.underlyingActor().waitForRecoveryComplete();
423             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
424
425             // Journal entries up to the last one should've been deleted
426             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
427             synchronized (journal) {
428                 assertEquals("Journal size", 1, journal.size());
429                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
430             }
431         }};
432     }
433
434     @Test
435     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
436         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
437         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
438                 persistedModules));
439         new JavaTestKit(getSystem()) {{
440             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
441                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
442
443             shardManager.underlyingActor().waitForRecoveryComplete();
444
445             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
446
447             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
448         }};
449     }
450
451     @Test
452     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
453             throws Exception {
454         new JavaTestKit(getSystem()) {{
455             final TestActorRef<ShardManager> shardManager =
456                     TestActorRef.create(getSystem(), newShardMgrProps());
457
458             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
459
460             ModuleIdentifier foo = mock(ModuleIdentifier.class);
461             when(foo.getNamespace()).thenReturn(new URI("foo"));
462
463             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
464             moduleIdentifierSet.add(foo);
465
466             SchemaContext schemaContext = mock(SchemaContext.class);
467             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
468
469             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
470
471             assertEquals("getKnownModules", Sets.newHashSet("foo"),
472                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
473
474             ModuleIdentifier bar = mock(ModuleIdentifier.class);
475             when(bar.getNamespace()).thenReturn(new URI("bar"));
476
477             moduleIdentifierSet.add(bar);
478
479             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
480
481             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
482                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
483         }};
484     }
485
486     @Test
487     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
488             throws Exception {
489         new JavaTestKit(getSystem()) {{
490             final TestActorRef<ShardManager> shardManager =
491                     TestActorRef.create(getSystem(), newShardMgrProps());
492
493             SchemaContext schemaContext = mock(SchemaContext.class);
494             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
495
496             ModuleIdentifier foo = mock(ModuleIdentifier.class);
497             when(foo.getNamespace()).thenReturn(new URI("foo"));
498
499             moduleIdentifierSet.add(foo);
500
501             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
502
503             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
504
505             assertEquals("getKnownModules", Sets.newHashSet("foo"),
506                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
507
508             //Create a completely different SchemaContext with only the bar module in it
509             //schemaContext = mock(SchemaContext.class);
510             moduleIdentifierSet.clear();
511             ModuleIdentifier bar = mock(ModuleIdentifier.class);
512             when(bar.getNamespace()).thenReturn(new URI("bar"));
513
514             moduleIdentifierSet.add(bar);
515
516             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
517
518             assertEquals("getKnownModules", Sets.newHashSet("foo"),
519                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
520
521         }};
522     }
523
524     @Test
525     public void testRecoveryApplicable(){
526         new JavaTestKit(getSystem()) {
527             {
528                 final Props persistentProps = ShardManager.props(
529                         new MockClusterWrapper(),
530                         new MockConfiguration(),
531                         DatastoreContext.newBuilder().persistent(true).build(), ready);
532                 final TestActorRef<ShardManager> persistentShardManager =
533                         TestActorRef.create(getSystem(), persistentProps);
534
535                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
536
537                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
538
539                 final Props nonPersistentProps = ShardManager.props(
540                         new MockClusterWrapper(),
541                         new MockConfiguration(),
542                         DatastoreContext.newBuilder().persistent(false).build(), ready);
543                 final TestActorRef<ShardManager> nonPersistentShardManager =
544                         TestActorRef.create(getSystem(), nonPersistentProps);
545
546                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
547
548                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
549
550
551             }};
552
553     }
554
555     @Test
556     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
557             throws Exception {
558         final CountDownLatch persistLatch = new CountDownLatch(1);
559         final Creator<ShardManager> creator = new Creator<ShardManager>() {
560             private static final long serialVersionUID = 1L;
561             @Override
562             public ShardManager create() throws Exception {
563                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
564                     @Override
565                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
566                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
567                                 = new DataPersistenceProviderMonitor();
568                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
569                         return dataPersistenceProviderMonitor;
570                     }
571                 };
572             }
573         };
574
575         new JavaTestKit(getSystem()) {{
576
577             final TestActorRef<ShardManager> shardManager =
578                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
579
580             ModuleIdentifier foo = mock(ModuleIdentifier.class);
581             when(foo.getNamespace()).thenReturn(new URI("foo"));
582
583             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
584             moduleIdentifierSet.add(foo);
585
586             SchemaContext schemaContext = mock(SchemaContext.class);
587             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
588
589             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
590
591             assertEquals("Persisted", true,
592                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
593
594         }};
595     }
596
597     @Test
598     public void testRoleChangeNotificationReleaseReady() throws Exception {
599         new JavaTestKit(getSystem()) {
600             {
601                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
602
603                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
604                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
605                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
606
607                 verify(ready, times(1)).countDown();
608
609             }};
610     }
611
612     @Test
613     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
614         new JavaTestKit(getSystem()) {
615             {
616                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
617
618                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
619                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
620
621                 verify(ready, never()).countDown();
622
623             }};
624     }
625
626
627     @Test
628     public void testByDefaultSyncStatusIsFalse() throws Exception{
629         final Props persistentProps = ShardManager.props(
630                 new MockClusterWrapper(),
631                 new MockConfiguration(),
632                 DatastoreContext.newBuilder().persistent(true).build(), ready);
633         final TestActorRef<ShardManager> shardManager =
634                 TestActorRef.create(getSystem(), persistentProps);
635
636         ShardManager shardManagerActor = shardManager.underlyingActor();
637
638         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
639     }
640
641     @Test
642     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
643         final Props persistentProps = ShardManager.props(
644                 new MockClusterWrapper(),
645                 new MockConfiguration(),
646                 DatastoreContext.newBuilder().persistent(true).build(), ready);
647         final TestActorRef<ShardManager> shardManager =
648                 TestActorRef.create(getSystem(), persistentProps);
649
650         ShardManager shardManagerActor = shardManager.underlyingActor();
651         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
652                 RaftState.Follower.name(), RaftState.Leader.name()));
653
654         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
655     }
656
657     @Test
658     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
659         final Props persistentProps = ShardManager.props(
660                 new MockClusterWrapper(),
661                 new MockConfiguration(),
662                 DatastoreContext.newBuilder().persistent(true).build(), ready);
663         final TestActorRef<ShardManager> shardManager =
664                 TestActorRef.create(getSystem(), persistentProps);
665
666         ShardManager shardManagerActor = shardManager.underlyingActor();
667         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
668                 RaftState.Follower.name(), RaftState.Candidate.name()));
669
670         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
671
672         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
673         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
674
675         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
676     }
677
678     @Test
679     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
680         final Props persistentProps = ShardManager.props(
681                 new MockClusterWrapper(),
682                 new MockConfiguration(),
683                 DatastoreContext.newBuilder().persistent(true).build(), ready);
684         final TestActorRef<ShardManager> shardManager =
685                 TestActorRef.create(getSystem(), persistentProps);
686
687         ShardManager shardManagerActor = shardManager.underlyingActor();
688         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
689                 RaftState.Candidate.name(), RaftState.Follower.name()));
690
691         // Initially will be false
692         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
693
694         // Send status true will make sync status true
695         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
696
697         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
698
699         // Send status false will make sync status false
700         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
701
702         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
703
704     }
705
706     @Test
707     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
708         final Props persistentProps = ShardManager.props(
709                 new MockClusterWrapper(),
710                 new MockConfiguration() {
711                     @Override
712                     public List<String> getMemberShardNames(String memberName) {
713                         return Arrays.asList("default", "astronauts");
714                     }
715                 },
716                 DatastoreContext.newBuilder().persistent(true).build(), ready);
717         final TestActorRef<ShardManager> shardManager =
718                 TestActorRef.create(getSystem(), persistentProps);
719
720         ShardManager shardManagerActor = shardManager.underlyingActor();
721
722         // Initially will be false
723         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
724
725         // Make default shard leader
726         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
727                 RaftState.Follower.name(), RaftState.Leader.name()));
728
729         // default = Leader, astronauts is unknown so sync status remains false
730         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
731
732         // Make astronauts shard leader as well
733         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
734                 RaftState.Follower.name(), RaftState.Leader.name()));
735
736         // Now sync status should be true
737         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
738
739         // Make astronauts a Follower
740         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
741                 RaftState.Leader.name(), RaftState.Follower.name()));
742
743         // Sync status is not true
744         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
745
746         // Make the astronauts follower sync status true
747         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
748
749         // Sync status is now true
750         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
751
752     }
753
754     private static class TestShardManager extends ShardManager {
755         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
756
757         TestShardManager(String shardMrgIDSuffix) {
758             super(new MockClusterWrapper(), new MockConfiguration(),
759                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
760         }
761
762         @Override
763         public void handleRecover(Object message) throws Exception {
764             try {
765                 super.handleRecover(message);
766             } finally {
767                 if(message instanceof RecoveryCompleted) {
768                     recoveryComplete.countDown();
769                 }
770             }
771         }
772
773         void waitForRecoveryComplete() {
774             assertEquals("Recovery complete", true,
775                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
776         }
777     }
778
779     @SuppressWarnings("serial")
780     static class TestShardManagerCreator implements Creator<TestShardManager> {
781         String shardMrgIDSuffix;
782
783         TestShardManagerCreator(String shardMrgIDSuffix) {
784             this.shardMrgIDSuffix = shardMrgIDSuffix;
785         }
786
787         @Override
788         public TestShardManager create() throws Exception {
789             return new TestShardManager(shardMrgIDSuffix);
790         }
791
792     }
793
794     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
795         private static final long serialVersionUID = 1L;
796         private final Creator<ShardManager> delegate;
797
798         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
799             this.delegate = delegate;
800         }
801
802         @Override
803         public ShardManager create() throws Exception {
804             return delegate.create();
805         }
806     }
807 }