+ protected void onLeaderChanged(final String oldLeader, final String newLeader) {
+ boolean isLeader = isLeader();
+ LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
+ newLeader, isLeader);
+
+ if (isLeader) {
+
+ // Re-initialize the downPeerMemberNames from the current akka Cluster state. The previous leader, if any,
+ // is most likely down however it's possible we haven't received the PeerDown message yet.
+ initializeDownPeerMemberNamesFromClusterState();
+
+ // Clear all existing strategies so that they get re-created when we call createStrategy again
+ // This allows the strategies to be re-initialized with existing statistics maintained by
+ // EntityOwnershipStatistics
+ strategyConfig.clearStrategies();
+
+ // Re-assign owners for all members that are known to be down. In a cluster which has greater than
+ // 3 nodes it is possible for some node beside the leader being down when the leadership transitions
+ // it makes sense to use this event to re-assign owners for those downed nodes.
+ Set<String> ownedBy = new HashSet<>(downPeerMemberNames.size() + 1);
+ for (MemberName downPeerName : downPeerMemberNames) {
+ ownedBy.add(downPeerName.getName());
+ }
+
+ // Also try to assign owners for entities that have no current owner. See explanation in onPeerUp.
+ ownedBy.add("");
+ selectNewOwnerForEntitiesOwnedBy(ownedBy);
+ } else {
+ // The leader changed - notify the coordinator to check if pending modifications need to be sent.
+ // While onStateChanged also does this, this method handles the case where the shard hears from a
+ // leader and stays in the follower state. In that case no behavior state change occurs.
+ commitCoordinator.onStateChanged(this, isLeader);
+ }
+