+ reset(leaderMockListener);
+
+ candidateReg.close();
+ verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false));
+ reset(leaderMockListener);
+
+ // Restart follower1 and verify the entity ownership shard is re-instated by registering.
+ Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress());
+ follower1Node.cleanup();
+
+ follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+ follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
+
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+
+ verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName());
+ assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size());
+ assertThat("Peer Id", Iterables.<String>getLast(raftState.getPeerAddresses().keySet()),
+ org.hamcrest.CoreMatchers.containsString("member-1"));
+ }
+ });