+ boolean isLeader = isLeader();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());
+ }
+
+ commitCoordinator.onStateChanged(this, isLeader);
+ }
+
+ @Override
+ protected void onLeaderChanged(String oldLeader, String newLeader) {
+ super.onLeaderChanged(oldLeader, newLeader);
+
+ boolean isLeader = isLeader();
+ LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
+ newLeader, isLeader);
+
+ if(isLeader) {
+ // We were just elected leader. If the old leader is down, select new owners for the entities
+ // owned by the down leader.
+
+ String oldLeaderMemberName = peerIdToMemberNames.get(oldLeader);
+
+ LOG.debug("{}: oldLeaderMemberName: {}", persistenceId(), oldLeaderMemberName);
+
+ if(downPeerMemberNames.contains(oldLeaderMemberName)) {
+ removeCandidateFromEntities(oldLeaderMemberName);
+ }
+ } else {
+ // The leader changed - notify the coordinator to check if pending modifications need to be sent.
+ // While onStateChanged also does this, this method handles the case where the shard hears from a
+ // leader and stays in the follower state. In that case no behavior state change occurs.
+ commitCoordinator.onStateChanged(this, isLeader);
+ }
+ }
+
+ private void onCandidateRemoved(CandidateRemoved message) {
+ LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
+
+ if(isLeader()) {
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ if(message.getRemovedCandidate().equals(currentOwner) || message.getRemainingCandidates().size() == 0){
+ String entityType = EntityOwnersModel.entityTypeFromEntityPath(message.getEntityPath());
+ writeNewOwner(message.getEntityPath(),
+ newOwner(message.getRemainingCandidates(), entityOwnershipStatistics.byEntityType(entityType),
+ getEntityOwnerElectionStrategy(message.getEntityPath())));
+ }
+ } else {
+ // We're not the leader. If the removed candidate is our local member then check if we actually
+ // have a local candidate registered. If we do then we must have been partitioned from the leader
+ // and the leader removed our candidate since the leader can't tell the difference between a
+ // temporary network partition and a node's process actually restarted. So, in that case, re-add
+ // our candidate.
+ if(localMemberName.equals(message.getRemovedCandidate()) &&
+ listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
+ LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
+ " - adding back local candidate", message.getEntityPath());
+
+ commitCoordinator.commitModification(new MergeModification(
+ candidatePath(message.getEntityPath(), localMemberName),
+ candidateMapEntry(localMemberName)), this);
+ }
+ }
+ }
+
+ private EntityOwnerSelectionStrategy getEntityOwnerElectionStrategy(YangInstanceIdentifier entityPath) {
+ final String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath);
+ return strategyConfig.createStrategy(entityType);
+ }
+
+ private void onCandidateAdded(CandidateAdded message) {
+ if(!isLeader()){
+ return;
+ }
+
+ LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
+
+ // Since a node's candidate member is only added by the node itself, we can assume the node is up so
+ // remove it from the downPeerMemberNames.
+ downPeerMemberNames.remove(message.getNewCandidate());
+
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ EntityOwnerSelectionStrategy strategy = getEntityOwnerElectionStrategy(message.getEntityPath());
+
+ LOG.debug("{}: Using strategy {} to select owner", persistenceId(), strategy);
+ if(Strings.isNullOrEmpty(currentOwner)){
+ if(strategy.getSelectionDelayInMillis() == 0L) {
+ String entityType = EntityOwnersModel.entityTypeFromEntityPath(message.getEntityPath());
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(),
+ entityOwnershipStatistics.byEntityType(entityType), strategy));
+ } else {
+ scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);
+ }
+ }
+ }
+
+ private void onPeerDown(PeerDown peerDown) {
+ LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
+
+ String downMemberName = peerDown.getMemberName();
+ if(downPeerMemberNames.add(downMemberName) && isLeader()) {
+ // Remove the down peer as a candidate from all entities.
+ removeCandidateFromEntities(downMemberName);
+ }
+ }
+
+ private void onPeerUp(PeerUp peerUp) {
+ LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
+
+ peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
+ downPeerMemberNames.remove(peerUp.getMemberName());
+
+ // Notify the coordinator to check if pending modifications need to be sent. We do this here
+ // to handle the case where the leader's peer address isn't now yet when a prior state or
+ // leader change occurred.