1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.mock;
7 import static org.mockito.Mockito.never;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import static org.mockito.Mockito.when;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.pattern.Patterns;
15 import akka.persistence.RecoveryCompleted;
16 import akka.testkit.JavaTestKit;
17 import akka.testkit.TestActorRef;
18 import akka.util.Timeout;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.collect.Sets;
21 import com.google.common.util.concurrent.Uninterruptibles;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.Mock;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.cluster.DataPersistenceProvider;
37 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
38 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
39 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
40 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
41 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
45 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
47 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
48 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
49 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
50 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
51 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 import scala.concurrent.Await;
58 import scala.concurrent.Future;
60 public class ShardManagerTest extends AbstractActorTest {
61 private static int ID_COUNTER = 1;
63 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
64 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
67 private static CountDownLatch ready;
69 private static ActorRef mockShardActor;
73 MockitoAnnotations.initMocks(this);
75 InMemoryJournal.clear();
77 if(mockShardActor == null) {
78 String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
79 mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
84 public void tearDown() {
85 InMemoryJournal.clear();
88 private Props newShardMgrProps() {
89 DatastoreContext.Builder builder = DatastoreContext.newBuilder();
90 builder.dataStoreType(shardMrgIDSuffix);
91 return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
92 builder.build(), ready);
96 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
97 new JavaTestKit(getSystem()) {{
98 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
100 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
102 shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
104 expectMsgEquals(duration("5 seconds"),
105 new PrimaryNotFound("non-existent").toSerializable());
110 public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
111 new JavaTestKit(getSystem()) {{
112 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
114 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
115 shardManager.tell(new ActorInitialized(), mockShardActor);
117 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
119 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
124 public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
125 new JavaTestKit(getSystem()) {{
126 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
128 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
130 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
135 public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
136 new JavaTestKit(getSystem()) {{
137 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
139 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
141 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
142 // delayed until we send ActorInitialized.
143 Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
144 new Timeout(5, TimeUnit.SECONDS));
146 shardManager.tell(new ActorInitialized(), mockShardActor);
148 Object resp = Await.result(future, duration("5 seconds"));
149 assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
154 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
155 new JavaTestKit(getSystem()) {{
156 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
158 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
160 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
162 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
164 assertEquals("getShardName", "non-existent", notFound.getShardName());
169 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
170 new JavaTestKit(getSystem()) {{
171 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
173 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
174 shardManager.tell(new ActorInitialized(), mockShardActor);
176 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
178 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
180 assertTrue("Found path contains " + found.getPath().path().toString(),
181 found.getPath().path().toString().contains("member-1-shard-default-config"));
186 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
187 new JavaTestKit(getSystem()) {{
188 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
190 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
192 expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
197 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
198 new JavaTestKit(getSystem()) {{
199 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
201 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
203 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
204 // delayed until we send ActorInitialized.
205 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
206 new Timeout(5, TimeUnit.SECONDS));
208 shardManager.tell(new ActorInitialized(), mockShardActor);
210 Object resp = Await.result(future, duration("5 seconds"));
211 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
216 public void testOnReceiveMemberUp() throws Exception {
217 new JavaTestKit(getSystem()) {{
218 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
220 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
222 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
224 PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
225 PrimaryFound.SERIALIZABLE_CLASS));
226 String path = found.getPrimaryPath();
227 assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
232 public void testOnReceiveMemberDown() throws Exception {
234 new JavaTestKit(getSystem()) {{
235 final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
237 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
239 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
241 expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
243 MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
245 shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
247 expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
252 public void testOnRecoveryJournalIsCleaned() {
253 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
254 ImmutableSet.of("foo")));
255 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
256 ImmutableSet.of("bar")));
257 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
259 new JavaTestKit(getSystem()) {{
260 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
261 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
263 shardManager.underlyingActor().waitForRecoveryComplete();
264 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
266 // Journal entries up to the last one should've been deleted
267 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
268 synchronized (journal) {
269 assertEquals("Journal size", 1, journal.size());
270 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
276 public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
277 final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
278 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
280 new JavaTestKit(getSystem()) {{
281 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
282 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
284 shardManager.underlyingActor().waitForRecoveryComplete();
286 Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
288 assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
293 public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
295 new JavaTestKit(getSystem()) {{
296 final TestActorRef<ShardManager> shardManager =
297 TestActorRef.create(getSystem(), newShardMgrProps());
299 assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
301 ModuleIdentifier foo = mock(ModuleIdentifier.class);
302 when(foo.getNamespace()).thenReturn(new URI("foo"));
304 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
305 moduleIdentifierSet.add(foo);
307 SchemaContext schemaContext = mock(SchemaContext.class);
308 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
310 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
312 assertEquals("getKnownModules", Sets.newHashSet("foo"),
313 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
315 ModuleIdentifier bar = mock(ModuleIdentifier.class);
316 when(bar.getNamespace()).thenReturn(new URI("bar"));
318 moduleIdentifierSet.add(bar);
320 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
322 assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
323 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
328 public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
330 new JavaTestKit(getSystem()) {{
331 final TestActorRef<ShardManager> shardManager =
332 TestActorRef.create(getSystem(), newShardMgrProps());
334 SchemaContext schemaContext = mock(SchemaContext.class);
335 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
337 ModuleIdentifier foo = mock(ModuleIdentifier.class);
338 when(foo.getNamespace()).thenReturn(new URI("foo"));
340 moduleIdentifierSet.add(foo);
342 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
344 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
346 assertEquals("getKnownModules", Sets.newHashSet("foo"),
347 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
349 //Create a completely different SchemaContext with only the bar module in it
350 //schemaContext = mock(SchemaContext.class);
351 moduleIdentifierSet.clear();
352 ModuleIdentifier bar = mock(ModuleIdentifier.class);
353 when(bar.getNamespace()).thenReturn(new URI("bar"));
355 moduleIdentifierSet.add(bar);
357 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
359 assertEquals("getKnownModules", Sets.newHashSet("foo"),
360 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
366 public void testRecoveryApplicable(){
367 new JavaTestKit(getSystem()) {
369 final Props persistentProps = ShardManager.props(
370 new MockClusterWrapper(),
371 new MockConfiguration(),
372 DatastoreContext.newBuilder().persistent(true).build(), ready);
373 final TestActorRef<ShardManager> persistentShardManager =
374 TestActorRef.create(getSystem(), persistentProps);
376 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
378 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
380 final Props nonPersistentProps = ShardManager.props(
381 new MockClusterWrapper(),
382 new MockConfiguration(),
383 DatastoreContext.newBuilder().persistent(false).build(), ready);
384 final TestActorRef<ShardManager> nonPersistentShardManager =
385 TestActorRef.create(getSystem(), nonPersistentProps);
387 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
389 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
397 public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
399 final CountDownLatch persistLatch = new CountDownLatch(1);
400 final Creator<ShardManager> creator = new Creator<ShardManager>() {
401 private static final long serialVersionUID = 1L;
403 public ShardManager create() throws Exception {
404 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
406 protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
407 DataPersistenceProviderMonitor dataPersistenceProviderMonitor
408 = new DataPersistenceProviderMonitor();
409 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
410 return dataPersistenceProviderMonitor;
416 new JavaTestKit(getSystem()) {{
418 final TestActorRef<ShardManager> shardManager =
419 TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
421 ModuleIdentifier foo = mock(ModuleIdentifier.class);
422 when(foo.getNamespace()).thenReturn(new URI("foo"));
424 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
425 moduleIdentifierSet.add(foo);
427 SchemaContext schemaContext = mock(SchemaContext.class);
428 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
430 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
432 assertEquals("Persisted", true,
433 Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
439 public void testRoleChangeNotificationReleaseReady() throws Exception {
440 new JavaTestKit(getSystem()) {
442 final Props persistentProps = ShardManager.props(
443 new MockClusterWrapper(),
444 new MockConfiguration(),
445 DatastoreContext.newBuilder().persistent(true).build(), ready);
446 final TestActorRef<ShardManager> shardManager =
447 TestActorRef.create(getSystem(), persistentProps);
449 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
451 verify(ready, times(1)).countDown();
457 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
458 new JavaTestKit(getSystem()) {
460 final Props persistentProps = ShardManager.props(
461 new MockClusterWrapper(),
462 new MockConfiguration(),
463 DatastoreContext.newBuilder().persistent(true).build(), ready);
464 final TestActorRef<ShardManager> shardManager =
465 TestActorRef.create(getSystem(), persistentProps);
467 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
469 verify(ready, never()).countDown();
476 public void testByDefaultSyncStatusIsFalse() throws Exception{
477 final Props persistentProps = ShardManager.props(
478 new MockClusterWrapper(),
479 new MockConfiguration(),
480 DatastoreContext.newBuilder().persistent(true).build(), ready);
481 final TestActorRef<ShardManager> shardManager =
482 TestActorRef.create(getSystem(), persistentProps);
484 ShardManager shardManagerActor = shardManager.underlyingActor();
486 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
490 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
491 final Props persistentProps = ShardManager.props(
492 new MockClusterWrapper(),
493 new MockConfiguration(),
494 DatastoreContext.newBuilder().persistent(true).build(), ready);
495 final TestActorRef<ShardManager> shardManager =
496 TestActorRef.create(getSystem(), persistentProps);
498 ShardManager shardManagerActor = shardManager.underlyingActor();
499 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
500 RaftState.Follower.name(), RaftState.Leader.name()));
502 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
506 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
507 final Props persistentProps = ShardManager.props(
508 new MockClusterWrapper(),
509 new MockConfiguration(),
510 DatastoreContext.newBuilder().persistent(true).build(), ready);
511 final TestActorRef<ShardManager> shardManager =
512 TestActorRef.create(getSystem(), persistentProps);
514 ShardManager shardManagerActor = shardManager.underlyingActor();
515 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
516 RaftState.Follower.name(), RaftState.Candidate.name()));
518 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
520 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
521 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
523 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
527 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
528 final Props persistentProps = ShardManager.props(
529 new MockClusterWrapper(),
530 new MockConfiguration(),
531 DatastoreContext.newBuilder().persistent(true).build(), ready);
532 final TestActorRef<ShardManager> shardManager =
533 TestActorRef.create(getSystem(), persistentProps);
535 ShardManager shardManagerActor = shardManager.underlyingActor();
536 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
537 RaftState.Candidate.name(), RaftState.Follower.name()));
539 // Initially will be false
540 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
542 // Send status true will make sync status true
543 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
545 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
547 // Send status false will make sync status false
548 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
550 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
555 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
556 final Props persistentProps = ShardManager.props(
557 new MockClusterWrapper(),
558 new MockConfiguration() {
560 public List<String> getMemberShardNames(String memberName) {
561 return Arrays.asList("default", "astronauts");
564 DatastoreContext.newBuilder().persistent(true).build(), ready);
565 final TestActorRef<ShardManager> shardManager =
566 TestActorRef.create(getSystem(), persistentProps);
568 ShardManager shardManagerActor = shardManager.underlyingActor();
570 // Initially will be false
571 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
573 // Make default shard leader
574 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
575 RaftState.Follower.name(), RaftState.Leader.name()));
577 // default = Leader, astronauts is unknown so sync status remains false
578 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
580 // Make astronauts shard leader as well
581 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
582 RaftState.Follower.name(), RaftState.Leader.name()));
584 // Now sync status should be true
585 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
587 // Make astronauts a Follower
588 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
589 RaftState.Leader.name(), RaftState.Follower.name()));
591 // Sync status is not true
592 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
594 // Make the astronauts follower sync status true
595 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
597 // Sync status is now true
598 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
602 private static class TestShardManager extends ShardManager {
603 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
605 TestShardManager(String shardMrgIDSuffix) {
606 super(new MockClusterWrapper(), new MockConfiguration(),
607 DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
611 public void handleRecover(Object message) throws Exception {
613 super.handleRecover(message);
615 if(message instanceof RecoveryCompleted) {
616 recoveryComplete.countDown();
621 void waitForRecoveryComplete() {
622 assertEquals("Recovery complete", true,
623 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
627 @SuppressWarnings("serial")
628 static class TestShardManagerCreator implements Creator<TestShardManager> {
629 String shardMrgIDSuffix;
631 TestShardManagerCreator(String shardMrgIDSuffix) {
632 this.shardMrgIDSuffix = shardMrgIDSuffix;
636 public TestShardManager create() throws Exception {
637 return new TestShardManager(shardMrgIDSuffix);
642 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
643 private static final long serialVersionUID = 1L;
644 private Creator<ShardManager> delegate;
646 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
647 this.delegate = delegate;
651 public ShardManager create() throws Exception {
652 return delegate.create();