Do not use ActorSystem.actorFor as it is deprecated
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.mock;
7 import static org.mockito.Mockito.never;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import static org.mockito.Mockito.when;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.pattern.Patterns;
15 import akka.persistence.RecoveryCompleted;
16 import akka.testkit.JavaTestKit;
17 import akka.testkit.TestActorRef;
18 import akka.util.Timeout;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.collect.Sets;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import java.net.URI;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.Mock;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.cluster.DataPersistenceProvider;
37 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
38 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
39 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
40 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
41 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
45 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
47 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
48 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
49 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
50 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
51 import org.opendaylight.controller.cluster.raft.RaftState;
52 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
53 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 import scala.concurrent.Await;
58 import scala.concurrent.Future;
59
60 public class ShardManagerTest extends AbstractActorTest {
61     private static int ID_COUNTER = 1;
62
63     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
64     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
65
66     @Mock
67     private static CountDownLatch ready;
68
69     private static ActorRef mockShardActor;
70
71     @Before
72     public void setUp() {
73         MockitoAnnotations.initMocks(this);
74
75         InMemoryJournal.clear();
76
77         if(mockShardActor == null) {
78             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
79             mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
80         }
81     }
82
83     @After
84     public void tearDown() {
85         InMemoryJournal.clear();
86     }
87
88     private Props newShardMgrProps() {
89         DatastoreContext.Builder builder = DatastoreContext.newBuilder();
90         builder.dataStoreType(shardMrgIDSuffix);
91         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
92                 builder.build(), ready);
93     }
94
95     @Test
96     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
97         new JavaTestKit(getSystem()) {{
98             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
99
100             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
101
102             shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
103
104             expectMsgEquals(duration("5 seconds"),
105                     new PrimaryNotFound("non-existent").toSerializable());
106         }};
107     }
108
109     @Test
110     public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
111         new JavaTestKit(getSystem()) {{
112             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
113
114             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
115             shardManager.tell(new ActorInitialized(), mockShardActor);
116
117             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
118
119             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
120         }};
121     }
122
123     @Test
124     public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
125         new JavaTestKit(getSystem()) {{
126             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
127
128             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
129
130             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
131         }};
132     }
133
134     @Test
135     public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
136         new JavaTestKit(getSystem()) {{
137             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
138
139             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
140
141             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
142             // delayed until we send ActorInitialized.
143             Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
144                     new Timeout(5, TimeUnit.SECONDS));
145
146             shardManager.tell(new ActorInitialized(), mockShardActor);
147
148             Object resp = Await.result(future, duration("5 seconds"));
149             assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
150         }};
151     }
152
153     @Test
154     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
155         new JavaTestKit(getSystem()) {{
156             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
157
158             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
159
160             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
161
162             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
163
164             assertEquals("getShardName", "non-existent", notFound.getShardName());
165         }};
166     }
167
168     @Test
169     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
170         new JavaTestKit(getSystem()) {{
171             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
172
173             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
174             shardManager.tell(new ActorInitialized(), mockShardActor);
175
176             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
177
178             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
179
180             assertTrue("Found path contains " + found.getPath().path().toString(),
181                     found.getPath().path().toString().contains("member-1-shard-default-config"));
182         }};
183     }
184
185     @Test
186     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
187         new JavaTestKit(getSystem()) {{
188             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
189
190             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
191
192             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
193         }};
194     }
195
196     @Test
197     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
198         new JavaTestKit(getSystem()) {{
199             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
200
201             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
202
203             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
204             // delayed until we send ActorInitialized.
205             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
206                     new Timeout(5, TimeUnit.SECONDS));
207
208             shardManager.tell(new ActorInitialized(), mockShardActor);
209
210             Object resp = Await.result(future, duration("5 seconds"));
211             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
212         }};
213     }
214
215     @Test
216     public void testOnReceiveMemberUp() throws Exception {
217         new JavaTestKit(getSystem()) {{
218             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
219
220             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
221
222             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
223
224             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
225                     PrimaryFound.SERIALIZABLE_CLASS));
226             String path = found.getPrimaryPath();
227             assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
228         }};
229     }
230
231     @Test
232     public void testOnReceiveMemberDown() throws Exception {
233
234         new JavaTestKit(getSystem()) {{
235             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
236
237             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
238
239             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
240
241             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
242
243             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
244
245             shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
246
247             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
248         }};
249     }
250
251     @Test
252     public void testOnRecoveryJournalIsCleaned() {
253         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
254                 ImmutableSet.of("foo")));
255         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
256                 ImmutableSet.of("bar")));
257         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
258
259         new JavaTestKit(getSystem()) {{
260             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
261                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
262
263             shardManager.underlyingActor().waitForRecoveryComplete();
264             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
265
266             // Journal entries up to the last one should've been deleted
267             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
268             synchronized (journal) {
269                 assertEquals("Journal size", 1, journal.size());
270                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
271             }
272         }};
273     }
274
275     @Test
276     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
277         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
278         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
279                 persistedModules));
280         new JavaTestKit(getSystem()) {{
281             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
282                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
283
284             shardManager.underlyingActor().waitForRecoveryComplete();
285
286             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
287
288             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
289         }};
290     }
291
292     @Test
293     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
294             throws Exception {
295         new JavaTestKit(getSystem()) {{
296             final TestActorRef<ShardManager> shardManager =
297                     TestActorRef.create(getSystem(), newShardMgrProps());
298
299             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
300
301             ModuleIdentifier foo = mock(ModuleIdentifier.class);
302             when(foo.getNamespace()).thenReturn(new URI("foo"));
303
304             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
305             moduleIdentifierSet.add(foo);
306
307             SchemaContext schemaContext = mock(SchemaContext.class);
308             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
309
310             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
311
312             assertEquals("getKnownModules", Sets.newHashSet("foo"),
313                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
314
315             ModuleIdentifier bar = mock(ModuleIdentifier.class);
316             when(bar.getNamespace()).thenReturn(new URI("bar"));
317
318             moduleIdentifierSet.add(bar);
319
320             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
321
322             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
323                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
324         }};
325     }
326
327     @Test
328     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
329             throws Exception {
330         new JavaTestKit(getSystem()) {{
331             final TestActorRef<ShardManager> shardManager =
332                     TestActorRef.create(getSystem(), newShardMgrProps());
333
334             SchemaContext schemaContext = mock(SchemaContext.class);
335             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
336
337             ModuleIdentifier foo = mock(ModuleIdentifier.class);
338             when(foo.getNamespace()).thenReturn(new URI("foo"));
339
340             moduleIdentifierSet.add(foo);
341
342             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
343
344             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
345
346             assertEquals("getKnownModules", Sets.newHashSet("foo"),
347                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
348
349             //Create a completely different SchemaContext with only the bar module in it
350             //schemaContext = mock(SchemaContext.class);
351             moduleIdentifierSet.clear();
352             ModuleIdentifier bar = mock(ModuleIdentifier.class);
353             when(bar.getNamespace()).thenReturn(new URI("bar"));
354
355             moduleIdentifierSet.add(bar);
356
357             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
358
359             assertEquals("getKnownModules", Sets.newHashSet("foo"),
360                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
361
362         }};
363     }
364
365     @Test
366     public void testRecoveryApplicable(){
367         new JavaTestKit(getSystem()) {
368             {
369                 final Props persistentProps = ShardManager.props(
370                         new MockClusterWrapper(),
371                         new MockConfiguration(),
372                         DatastoreContext.newBuilder().persistent(true).build(), ready);
373                 final TestActorRef<ShardManager> persistentShardManager =
374                         TestActorRef.create(getSystem(), persistentProps);
375
376                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
377
378                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
379
380                 final Props nonPersistentProps = ShardManager.props(
381                         new MockClusterWrapper(),
382                         new MockConfiguration(),
383                         DatastoreContext.newBuilder().persistent(false).build(), ready);
384                 final TestActorRef<ShardManager> nonPersistentShardManager =
385                         TestActorRef.create(getSystem(), nonPersistentProps);
386
387                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
388
389                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
390
391
392             }};
393
394     }
395
396     @Test
397     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
398             throws Exception {
399         final CountDownLatch persistLatch = new CountDownLatch(1);
400         final Creator<ShardManager> creator = new Creator<ShardManager>() {
401             private static final long serialVersionUID = 1L;
402             @Override
403             public ShardManager create() throws Exception {
404                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
405                     @Override
406                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
407                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
408                                 = new DataPersistenceProviderMonitor();
409                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
410                         return dataPersistenceProviderMonitor;
411                     }
412                 };
413             }
414         };
415
416         new JavaTestKit(getSystem()) {{
417
418             final TestActorRef<ShardManager> shardManager =
419                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
420
421             ModuleIdentifier foo = mock(ModuleIdentifier.class);
422             when(foo.getNamespace()).thenReturn(new URI("foo"));
423
424             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
425             moduleIdentifierSet.add(foo);
426
427             SchemaContext schemaContext = mock(SchemaContext.class);
428             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
429
430             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
431
432             assertEquals("Persisted", true,
433                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
434
435         }};
436     }
437
438     @Test
439     public void testRoleChangeNotificationReleaseReady() throws Exception {
440         new JavaTestKit(getSystem()) {
441             {
442                 final Props persistentProps = ShardManager.props(
443                         new MockClusterWrapper(),
444                         new MockConfiguration(),
445                         DatastoreContext.newBuilder().persistent(true).build(), ready);
446                 final TestActorRef<ShardManager> shardManager =
447                         TestActorRef.create(getSystem(), persistentProps);
448
449                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
450
451                 verify(ready, times(1)).countDown();
452
453             }};
454     }
455
456     @Test
457     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
458         new JavaTestKit(getSystem()) {
459             {
460                 final Props persistentProps = ShardManager.props(
461                         new MockClusterWrapper(),
462                         new MockConfiguration(),
463                         DatastoreContext.newBuilder().persistent(true).build(), ready);
464                 final TestActorRef<ShardManager> shardManager =
465                         TestActorRef.create(getSystem(), persistentProps);
466
467                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
468
469                 verify(ready, never()).countDown();
470
471             }};
472     }
473
474
475     @Test
476     public void testByDefaultSyncStatusIsFalse() throws Exception{
477         final Props persistentProps = ShardManager.props(
478                 new MockClusterWrapper(),
479                 new MockConfiguration(),
480                 DatastoreContext.newBuilder().persistent(true).build(), ready);
481         final TestActorRef<ShardManager> shardManager =
482                 TestActorRef.create(getSystem(), persistentProps);
483
484         ShardManager shardManagerActor = shardManager.underlyingActor();
485
486         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
487     }
488
489     @Test
490     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
491         final Props persistentProps = ShardManager.props(
492                 new MockClusterWrapper(),
493                 new MockConfiguration(),
494                 DatastoreContext.newBuilder().persistent(true).build(), ready);
495         final TestActorRef<ShardManager> shardManager =
496                 TestActorRef.create(getSystem(), persistentProps);
497
498         ShardManager shardManagerActor = shardManager.underlyingActor();
499         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
500                 RaftState.Follower.name(), RaftState.Leader.name()));
501
502         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
503     }
504
505     @Test
506     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
507         final Props persistentProps = ShardManager.props(
508                 new MockClusterWrapper(),
509                 new MockConfiguration(),
510                 DatastoreContext.newBuilder().persistent(true).build(), ready);
511         final TestActorRef<ShardManager> shardManager =
512                 TestActorRef.create(getSystem(), persistentProps);
513
514         ShardManager shardManagerActor = shardManager.underlyingActor();
515         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
516                 RaftState.Follower.name(), RaftState.Candidate.name()));
517
518         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
519
520         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
521         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
522
523         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
524     }
525
526     @Test
527     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
528         final Props persistentProps = ShardManager.props(
529                 new MockClusterWrapper(),
530                 new MockConfiguration(),
531                 DatastoreContext.newBuilder().persistent(true).build(), ready);
532         final TestActorRef<ShardManager> shardManager =
533                 TestActorRef.create(getSystem(), persistentProps);
534
535         ShardManager shardManagerActor = shardManager.underlyingActor();
536         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
537                 RaftState.Candidate.name(), RaftState.Follower.name()));
538
539         // Initially will be false
540         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
541
542         // Send status true will make sync status true
543         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
544
545         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
546
547         // Send status false will make sync status false
548         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
549
550         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
551
552     }
553
554     @Test
555     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
556         final Props persistentProps = ShardManager.props(
557                 new MockClusterWrapper(),
558                 new MockConfiguration() {
559                     @Override
560                     public List<String> getMemberShardNames(String memberName) {
561                         return Arrays.asList("default", "astronauts");
562                     }
563                 },
564                 DatastoreContext.newBuilder().persistent(true).build(), ready);
565         final TestActorRef<ShardManager> shardManager =
566                 TestActorRef.create(getSystem(), persistentProps);
567
568         ShardManager shardManagerActor = shardManager.underlyingActor();
569
570         // Initially will be false
571         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
572
573         // Make default shard leader
574         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
575                 RaftState.Follower.name(), RaftState.Leader.name()));
576
577         // default = Leader, astronauts is unknown so sync status remains false
578         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
579
580         // Make astronauts shard leader as well
581         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
582                 RaftState.Follower.name(), RaftState.Leader.name()));
583
584         // Now sync status should be true
585         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
586
587         // Make astronauts a Follower
588         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
589                 RaftState.Leader.name(), RaftState.Follower.name()));
590
591         // Sync status is not true
592         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
593
594         // Make the astronauts follower sync status true
595         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
596
597         // Sync status is now true
598         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
599
600     }
601
602     private static class TestShardManager extends ShardManager {
603         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
604
605         TestShardManager(String shardMrgIDSuffix) {
606             super(new MockClusterWrapper(), new MockConfiguration(),
607                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
608         }
609
610         @Override
611         public void handleRecover(Object message) throws Exception {
612             try {
613                 super.handleRecover(message);
614             } finally {
615                 if(message instanceof RecoveryCompleted) {
616                     recoveryComplete.countDown();
617                 }
618             }
619         }
620
621         void waitForRecoveryComplete() {
622             assertEquals("Recovery complete", true,
623                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
624         }
625     }
626
627     @SuppressWarnings("serial")
628     static class TestShardManagerCreator implements Creator<TestShardManager> {
629         String shardMrgIDSuffix;
630
631         TestShardManagerCreator(String shardMrgIDSuffix) {
632             this.shardMrgIDSuffix = shardMrgIDSuffix;
633         }
634
635         @Override
636         public TestShardManager create() throws Exception {
637             return new TestShardManager(shardMrgIDSuffix);
638         }
639
640     }
641
642     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
643         private static final long serialVersionUID = 1L;
644         private final Creator<ShardManager> delegate;
645
646         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
647             this.delegate = delegate;
648         }
649
650         @Override
651         public ShardManager create() throws Exception {
652             return delegate.create();
653         }
654     }
655 }