+ /**
+ * Tests bootstrapping the entity-ownership shard when there's no shards initially configured for local
+ * member. The entity-ownership shard is initially created as inactive (ie remains a follower), requiring
+ * an AddShardReplica request to join it to an existing leader.
+ */
+ @Test
+ public void testEntityOwnershipShardBootstrapping() throws Exception {
+ String name = "testEntityOwnershipShardBootstrapping";
+ String moduleShardsConfig = MODULE_SHARDS_MEMBER_1_CONFIG;
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
+ .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
+ .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
+
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
+
+ leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+ MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
+ .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
+ .datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ AbstractDataStore follower1DistributedDataStore = follower1Node.configDataStore();
+ follower1DistributedDataStore.waitTillReady();
+
+ leaderNode.waitForMembersUp("member-2");
+ follower1Node.waitForMembersUp("member-1");
+
+ DOMEntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1DistributedDataStore);
+
+ leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
+
+ // Register a candidate for follower1 - should get queued since follower1 has no leader
+ final DOMEntityOwnershipCandidateRegistration candidateReg =
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+ verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
+
+ // Add replica in follower1
+ AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME);
+ follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica,
+ follower1Node.kit().getRef());
+ Object reply = follower1Node.kit().expectMsgAnyClassOf(follower1Node.kit().duration("5 sec"),
+ Success.class, Failure.class);
+ if (reply instanceof Failure) {
+ throw new AssertionError("AddShardReplica failed", ((Failure)reply).cause());
+ }
+
+ // The queued candidate registration should proceed
+ verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+ reset(leaderMockListener);
+
+ candidateReg.close();
+ verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false));
+ reset(leaderMockListener);
+
+ // Restart follower1 and verify the entity ownership shard is re-instated by registering.
+ Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress());
+ follower1Node.cleanup();
+
+ follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
+ .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
+ .datastoreContextBuilder(followerDatastoreContextBuilder).build();
+ follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
+
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+
+ verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, raftState -> {
+ assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName());
+ assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size());
+ assertThat("Peer Id", Iterables.<String>getLast(raftState.getPeerAddresses().keySet()),
+ org.hamcrest.CoreMatchers.containsString("member-1"));
+ });
+ }
+
+ @Test
+ public void testOwnerSelectedOnRapidUnregisteringAndRegisteringOfCandidates() throws Exception {
+ String name = "testOwnerSelectedOnRapidUnregisteringAndRegisteringOfCandidates";
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
+ .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
+
+ MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
+ .datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
+ .datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+
+ leaderDistributedDataStore.waitTillReady();
+ follower1Node.configDataStore().waitTillReady();
+ follower2Node.configDataStore().waitTillReady();
+
+ final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
+ final DOMEntityOwnershipService follower1EntityOwnershipService =
+ newOwnershipService(follower1Node.configDataStore());
+ newOwnershipService(follower2Node.configDataStore());
+
+ leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+ // Register leader candidate for entity1 and verify it becomes owner
+
+ DOMEntityOwnershipCandidateRegistration leaderEntity1Reg =
+ leaderEntityOwnershipService.registerCandidate(ENTITY1);
+
+ verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1");
+ verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
+
+ leaderEntity1Reg.close();
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
+
+ verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2");
+ verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
+ }
+
+ @Test
+ public void testOwnerSelectedOnRapidRegisteringAndUnregisteringOfCandidates() throws Exception {
+ String name = "testOwnerSelectedOnRapidRegisteringAndUnregisteringOfCandidates";
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
+ .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
+
+ MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
+ .datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
+ .datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+
+ leaderDistributedDataStore.waitTillReady();
+ follower1Node.configDataStore().waitTillReady();
+ follower2Node.configDataStore().waitTillReady();
+
+ final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
+ final DOMEntityOwnershipService follower1EntityOwnershipService =
+ newOwnershipService(follower1Node.configDataStore());
+ newOwnershipService(follower2Node.configDataStore());
+
+ leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+ // Register leader candidate for entity1 and verify it becomes owner
+
+ final DOMEntityOwnershipCandidateRegistration leaderEntity1Reg =
+ leaderEntityOwnershipService.registerCandidate(ENTITY1);
+
+ verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1");
+ verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
+
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ leaderEntity1Reg.close();
+
+ verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2");
+ verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
+ }
+
+ @Test
+ public void testEntityOwnershipWithNonVotingMembers() throws Exception {
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(5)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ String name = "testEntityOwnershipWithNonVotingMembers";
+ final MemberNode member1LeaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1")
+ .useAkkaArtery(false).testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
+ .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
+
+ final MemberNode member2FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member2")
+ .useAkkaArtery(false).testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
+ .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ final MemberNode member3FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member3")
+ .useAkkaArtery(false).testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
+ .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ final MemberNode member4FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member4")
+ .useAkkaArtery(false).testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
+ .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ final MemberNode member5FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member5")
+ .useAkkaArtery(false).testName(name)
+ .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
+ .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ AbstractDataStore leaderDistributedDataStore = member1LeaderNode.configDataStore();
+
+ leaderDistributedDataStore.waitTillReady();
+ member2FollowerNode.configDataStore().waitTillReady();
+ member3FollowerNode.configDataStore().waitTillReady();
+ member4FollowerNode.configDataStore().waitTillReady();
+ member5FollowerNode.configDataStore().waitTillReady();
+
+ member1LeaderNode.waitForMembersUp("member-2", "member-3", "member-4", "member-5");
+
+ final DOMEntityOwnershipService member3EntityOwnershipService =
+ newOwnershipService(member3FollowerNode.configDataStore());
+ final DOMEntityOwnershipService member4EntityOwnershipService =
+ newOwnershipService(member4FollowerNode.configDataStore());
+ final DOMEntityOwnershipService member5EntityOwnershipService =
+ newOwnershipService(member5FollowerNode.configDataStore());
+
+ newOwnershipService(member1LeaderNode.configDataStore());
+ member1LeaderNode.kit().waitUntilLeader(member1LeaderNode.configDataStore().getActorContext(),
+ ENTITY_OWNERSHIP_SHARD_NAME);
+
+ // Make member4 and member5 non-voting
+
+ Future<Object> future = Patterns.ask(leaderDistributedDataStore.getActorContext().getShardManager(),
+ new ChangeShardMembersVotingStatus(ENTITY_OWNERSHIP_SHARD_NAME,
+ ImmutableMap.of("member-4", false, "member-5", false)), new Timeout(10, TimeUnit.SECONDS));
+ Object response = Await.result(future, FiniteDuration.apply(10, TimeUnit.SECONDS));
+ if (response instanceof Throwable) {
+ throw new AssertionError("ChangeShardMembersVotingStatus failed", (Throwable)response);
+ }
+
+ assertNull("Expected null Success response. Actual " + response, response);
+
+ // Register member4 candidate for entity1 - it should not become owner since it's non-voting
+
+ member4EntityOwnershipService.registerCandidate(ENTITY1);
+ verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-4");
+
+ // Register member5 candidate for entity2 - it should not become owner since it's non-voting
+
+ member5EntityOwnershipService.registerCandidate(ENTITY2);
+ verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-5");
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(leaderDistributedDataStore, ENTITY1, "");
+ verifyOwner(leaderDistributedDataStore, ENTITY2, "");
+
+ // Register member3 candidate for entity1 - it should become owner since it's voting
+
+ member3EntityOwnershipService.registerCandidate(ENTITY1);
+ verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-4", "member-3");
+ verifyOwner(leaderDistributedDataStore, ENTITY1, "member-3");
+
+ // Switch member4 and member5 back to voting and member3 non-voting. This should result in member4 and member5
+ // to become entity owners.
+
+ future = Patterns.ask(leaderDistributedDataStore.getActorContext().getShardManager(),
+ new ChangeShardMembersVotingStatus(ENTITY_OWNERSHIP_SHARD_NAME,
+ ImmutableMap.of("member-3", false, "member-4", true, "member-5", true)),
+ new Timeout(10, TimeUnit.SECONDS));
+ response = Await.result(future, FiniteDuration.apply(10, TimeUnit.SECONDS));
+ if (response instanceof Throwable) {
+ throw new AssertionError("ChangeShardMembersVotingStatus failed", (Throwable)response);
+ }
+
+ assertNull("Expected null Success response. Actual " + response, response);
+
+ verifyOwner(leaderDistributedDataStore, ENTITY1, "member-4");
+ verifyOwner(leaderDistributedDataStore, ENTITY2, "member-5");
+ }
+
+ private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity,
+ final EntityOwnershipState expState) {