+ @Test
+ public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveMemberUp() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+ MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
+
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
+
+ PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
+ PrimaryFound.SERIALIZABLE_CLASS));
+ String path = found.getPrimaryPath();
+ assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveMemberDown() throws Exception {
+
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+ MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
+
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
+
+ expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+
+ MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
+
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
+
+ expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryJournalIsCleaned() {
+ InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("foo")));
+ InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("bar")));
+ InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
+
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+ InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
+
+ // Journal entries up to the last one should've been deleted
+ Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
+ synchronized (journal) {
+ assertEquals("Journal size", 1, journal.size());
+ assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
+ }
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
+ final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
+ InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ persistedModules));
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+
+ Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
+
+ assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
+ }};
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), newShardMgrProps());
+
+ assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());