+ @Test
+ public void testLeaderCandidatesRemovedAfterShutdown() throws Exception {
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(5).
+ customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ String name = "test";
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(leaderDatastoreContextBuilder).build();
+
+ MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name ).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+
+ leaderDistributedDataStore.waitTillReady();
+ follower1Node.configDataStore().waitTillReady();
+ follower2Node.configDataStore().waitTillReady();
+
+ EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
+ EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
+ EntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore());
+
+ leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+ // Register follower1 candidate for entity1 and verify it becomes owner
+
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
+
+ // Register leader candidate for entity1
+
+ leaderEntityOwnershipService.registerCandidate(ENTITY1);
+ verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2", "member-1");
+ verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
+
+ // Register leader candidate for entity2 and verify it becomes owner
+
+ leaderEntityOwnershipService.registerCandidate(ENTITY2);
+ verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
+
+ // Register follower2 candidate for entity2
+
+ follower2EntityOwnershipService.registerCandidate(ENTITY2);
+ verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3");
+ verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
+
+ // Shutdown the leader and verify its removed from the candidate list
+
+ leaderNode.cleanup();
+ follower1Node.waitForMemberDown("member-1");
+
+ // Re-enable elections on folower1 so it becomes the leader
+
+ ActorRef follower1Shard = IntegrationTestKit.findLocalShard(follower1Node.configDataStore().
+ getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+ follower1Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).
+ customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+ MemberNode.verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertEquals("Raft state", RaftState.Leader.toString(), raftState.getRaftState());
+ }
+ });
+
+ // Verify the prior leader's candidates are removed
+
+ verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2");
+ verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-3");
+ verifyOwner(follower1Node.configDataStore(), ENTITY1, "member-2");
+ verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
+ }
+