+ private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
+ LOG.debug("onUnregisterCandidateLocal: {}", unregisterCandidate);
+
+ Entity entity = unregisterCandidate.getEntity();
+ listenerSupport.removeEntityOwnershipListener(entity, unregisterCandidate.getCandidate());
+
+ YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
+ commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
+ void tryCommitModifications(final BatchedModifications modifications) {
+ if(isLeader()) {
+ LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+ // Note that it's possible the commit won't get consensus and will timeout and not be applied
+ // to the state. However we don't need to retry it in that case b/c it will be committed to
+ // the journal first and, once a majority of followers come back on line and it is replicated,
+ // it will be applied at that point.
+ handleBatchedModificationsLocal(modifications, self());
+ } else {
+ final ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
+
+ Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
+ getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
+
+ Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
+ }
+ }
+ }
+
+ boolean hasLeader() {
+ return getLeader() != null && !isIsolatedLeader();
+ }
+
+ @Override
+ protected void onStateChanged() {
+ super.onStateChanged();
+
+ commitCoordinator.onStateChanged(this, isLeader());
+ }
+
+ private void onCandidateRemoved(CandidateRemoved message) {
+ if(!isLeader()){
+ return;
+ }
+
+ LOG.debug("onCandidateRemoved: {}", message);
+
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ if(message.getRemovedCandidate().equals(currentOwner)){
+ writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+ }
+ }
+
+ private void onCandidateAdded(CandidateAdded message) {
+ if(!isLeader()){
+ return;
+ }
+
+ LOG.debug("onCandidateAdded: {}", message);
+
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ if(Strings.isNullOrEmpty(currentOwner)){
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
+ }
+ }
+
+ private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
+ LOG.debug("Writing new owner {} for entity {}", newOwner, entityPath);
+
+ commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
+ ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
+ }
+
+ private String newOwner(Collection<String> candidates) {
+ if(candidates.size() > 0){
+ return candidates.iterator().next();
+ }
+
+ return "";
+ }
+
+ private String getCurrentOwner(YangInstanceIdentifier entityId) {
+ DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+ Optional<NormalizedNode<?, ?>> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME));
+ if(optionalEntityOwner.isPresent()){
+ return optionalEntityOwner.get().getValue().toString();
+ }
+ return null;
+ }
+