+ }.get(); // this extracts the received message
+
+ assertTrue(out.path().toString(),
+ out.path().toString().contains("member-1-shard-default-config"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveMemberUp() throws Exception {
+
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
+
+ final ActorRef subject = getSystem().actorOf(props);
+
+ MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+
+ subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+ final String out = new ExpectMsg<String>(duration("3 seconds"), "primary found") {
+ // do not put code outside this method, will run afterwards
+ @Override
+ protected String match(Object in) {
+ if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+ PrimaryFound f = PrimaryFound.fromSerializable(in);
+ return f.getPrimaryPath();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertTrue(out, out.contains("member-2-shard-astronauts-config"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveMemberDown() throws Exception {
+
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
+
+ final ActorRef subject = getSystem().actorOf(props);
+
+ MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+
+ subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+ expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+
+ MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+
+ subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+ expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryJournalIsEmptied(){
+ MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("foo")));
+
+ assertEquals(1, MyJournal.get().size());
+
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
+
+ final ActorRef subject = getSystem().actorOf(props);
+
+ // Send message to check that ShardManager is ready
+ subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
+
+ expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+
+ assertEquals(0, MyJournal.get().size());
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
+ final TestActorRef<ShardManager> subject =
+ TestActorRef.create(system, props);
+
+ subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
+
+ Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+
+ assertTrue(knownModules.contains("foo"));
+ }};
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
+ throws Exception {
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
+ final TestActorRef<ShardManager> subject =
+ TestActorRef.create(system, props);
+
+ Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+
+ assertEquals(0, knownModules.size());
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ moduleIdentifierSet.add(foo);
+
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertTrue(knownModules.contains("foo"));
+
+ assertEquals(1, knownModules.size());
+
+ ModuleIdentifier bar = mock(ModuleIdentifier.class);
+ when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+ moduleIdentifierSet.add(bar);
+
+ subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertTrue(knownModules.contains("bar"));
+
+ assertEquals(2, knownModules.size());
+