+ shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
+
+ MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
+
+ String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId1,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-2-shard-default"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindPrimary so the response should be
+ // delayed until we send ActorInitialized and RoleChangeNotification.
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
+
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+
+ expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+ expectMsgClass(duration("2 seconds"), NotInitializedException.class);
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+ null, RaftState.Candidate.name()), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+ expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+ expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
+ String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+ // Create an ActorSystem ShardManager actor for member-1.
+
+ final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+ final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+ newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+ new MockConfiguration()), shardManagerID);
+
+ // Create an ActorSystem ShardManager actor for member-2.
+
+ final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+ Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
+
+ MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+ newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+ mockConfig2), shardManagerID);
+
+ new JavaTestKit(system1) {{
+
+ shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+ String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+ shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+ shardManager2.tell(new RoleChangeNotification(memberId2,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+
+ shardManager1.underlyingActor().waitForMemberUp();
+
+ shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+ PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ String path = found.getPrimaryPath();
+ assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
+
+ shardManager2.underlyingActor().verifyFindPrimary();
+
+ Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ shardManager1.underlyingActor().waitForMemberRemoved();
+
+ shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+ expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
+ }};
+
+ JavaTestKit.shutdownActorSystem(system1);
+ JavaTestKit.shutdownActorSystem(system2);
+ }
+
+ @Test
+ public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new FindLocalShard("non-existent", false), getRef());
+
+ LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ assertEquals("getShardName", "non-existent", notFound.getShardName());
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+
+ LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ assertTrue("Found path contains " + found.getPath().path().toString(),
+ found.getPath().path().toString().contains("member-1-shard-default-config"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryJournalIsCleaned() {
+ InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("foo")));
+ InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("bar")));
+ InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
+
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+ InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
+
+ // Journal entries up to the last one should've been deleted
+ Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
+ synchronized (journal) {
+ assertEquals("Journal size", 1, journal.size());
+ assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
+ }
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
+ final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
+ InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ persistedModules));
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+
+ Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
+
+ assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
+ }};
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), newShardMgrProps());
+
+ assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+ moduleIdentifierSet.add(foo);
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("getKnownModules", Sets.newHashSet("foo"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
+
+ ModuleIdentifier bar = mock(ModuleIdentifier.class);
+ when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+ moduleIdentifierSet.add(bar);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
+ }};
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), newShardMgrProps());
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ moduleIdentifierSet.add(foo);
+
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("getKnownModules", Sets.newHashSet("foo"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
+
+ //Create a completely different SchemaContext with only the bar module in it
+ //schemaContext = mock(SchemaContext.class);
+ moduleIdentifierSet.clear();
+ ModuleIdentifier bar = mock(ModuleIdentifier.class);
+ when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+ moduleIdentifierSet.add(bar);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("getKnownModules", Sets.newHashSet("foo"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));