Bug 4105: Remove candidates on PeerDown 08/26808/1
authorTom Pantelis <tpanteli@brocade.com>
Tue, 18 Aug 2015 06:59:47 +0000 (02:59 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 19:21:17 +0000 (15:21 -0400)
Currently on PeerDown, the EntityOwnershipShard selects a new owner for
the entities owned by the down node and leaves the down node as a
candidate. If the down node is the only candidate, the owner is cleared.
On PeerUp, it selects a new owner for those entities whose owner is clear.
This was done to handle network partition so a node's candidates remain
registered and are re-assigned when the partition is healed.

Howver this has potential issues when a node is actually
stopped/restarted. It's possible, on restart, that the node doesn't
register a candidate for an entity that it had previously registered for.
So it may get ownership of an entity for which it has no registered
candidate.

To alleviate this, I changed it to remove all the down node's candidates
on PeerDown. If the node was stopped/restarted, then it will
re-register candidates based on local client requests. This case will be
the norm. To handle network partition, when healed, the follower node
will get the replicated commits for its candidate removals from the
leader. So on Candidate removed, it re-adds its removed candidate if it
has a registered EntityOwnershipCandidate.

I realized that one can register a DOMDataTreeChangeListener for a leaf
node. So I simplified EntityOwnerChangeListener to listen for the owner
leaf instead of the entity path. This avoids the extra notifications
when candidayes are added/removed. I actually did this originally b/c I
thought there was a bug when listening at the entity level which turned
out there wasn't but I left it in as an improvement.

I also added the shard's logId to the listener and support classes for
better debugging of unit tests.

Change-Id: I75d2567ce54b9129eee052ba521c8a71777289b6
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java

index 82f926a..bd41ebe 100644 (file)
@@ -47,11 +47,13 @@ import org.slf4j.LoggerFactory;
 class CandidateListChangeListener implements DOMDataTreeChangeListener {
     private static final Logger LOG = LoggerFactory.getLogger(CandidateListChangeListener.class);
 
+    private final String logId;
     private final ActorRef shard;
     private final Map<YangInstanceIdentifier, Collection<String>> currentCandidates = new HashMap<>();
 
-    CandidateListChangeListener(ActorRef shard) {
+    CandidateListChangeListener(ActorRef shard, String logId) {
         this.shard = Preconditions.checkNotNull(shard, "shard should not be null");
+        this.logId = logId;
     }
 
     void init(ShardDataTree shardDataTree) {
@@ -65,7 +67,7 @@ class CandidateListChangeListener implements DOMDataTreeChangeListener {
         for(DataTreeCandidate change: changes) {
             DataTreeCandidateNode changeRoot = change.getRootNode();
 
-            LOG.debug("Candidate node changed: {}, {}", changeRoot.getModificationType(), change.getRootPath());
+            LOG.debug("{}: Candidate node changed: {}, {}", logId, changeRoot.getModificationType(), change.getRootPath());
 
             NodeIdentifierWithPredicates candidateKey =
                     (NodeIdentifierWithPredicates) change.getRootPath().getLastPathArgument();
@@ -74,12 +76,12 @@ class CandidateListChangeListener implements DOMDataTreeChangeListener {
             YangInstanceIdentifier entityId = extractEntityPath(change.getRootPath());
 
             if(changeRoot.getModificationType() == ModificationType.WRITE) {
-                LOG.debug("Candidate {} was added for entity {}", candidate, entityId);
+                LOG.debug("{}: Candidate {} was added for entity {}", logId, candidate, entityId);
 
                 Collection<String> currentCandidates = addToCurrentCandidates(entityId, candidate);
                 shard.tell(new CandidateAdded(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
             } else if(changeRoot.getModificationType() == ModificationType.DELETE) {
-                LOG.debug("Candidate {} was removed for entity {}", candidate, entityId);
+                LOG.debug("{}: Candidate {} was removed for entity {}", logId, candidate, entityId);
 
                 Collection<String> currentCandidates = removeFromCurrentCandidates(entityId, candidate);
                 shard.tell(new CandidateRemoved(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
index 9c551f2..cf9df18 100644 (file)
@@ -7,25 +7,18 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import java.util.Collection;
-import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
@@ -50,27 +43,26 @@ class EntityOwnerChangeListener implements DOMDataTreeChangeListener {
 
     void init(ShardDataTree shardDataTree) {
         shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH).
-                node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME).build(), this);
+                node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME).node(EntityOwnersModel.ENTITY_OWNER_QNAME).build(), this);
     }
 
     @Override
     public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
         for(DataTreeCandidate change: changes) {
             DataTreeCandidateNode changeRoot = change.getRootNode();
-            MapEntryNode entityNode = (MapEntryNode) changeRoot.getDataAfter().get();
+            LeafNode<?> ownerLeaf = (LeafNode<?>) changeRoot.getDataAfter().get();
 
-            LOG.debug("Entity node changed: {}, {}", changeRoot.getModificationType(), change.getRootPath());
+            LOG.debug("{}: Entity node changed: {}, {}", logId(), changeRoot.getModificationType(), change.getRootPath());
 
-            String newOwner = extractOwner(entityNode);
+            String newOwner = extractOwner(ownerLeaf);
 
             String origOwner = null;
             Optional<NormalizedNode<?, ?>> dataBefore = changeRoot.getDataBefore();
             if(dataBefore.isPresent()) {
-                MapEntryNode origEntityNode = (MapEntryNode) changeRoot.getDataBefore().get();
-                origOwner = extractOwner(origEntityNode);
+                origOwner = extractOwner((LeafNode<?>) changeRoot.getDataBefore().get());
             }
 
-            LOG.debug("New owner: {}, Original owner: {}", newOwner, origOwner);
+            LOG.debug("{}: New owner: {}, Original owner: {}", logId(), newOwner, origOwner);
 
             if(!Objects.equal(origOwner, newOwner)) {
                 boolean isOwner = Objects.equal(localMemberName, newOwner);
@@ -78,8 +70,8 @@ class EntityOwnerChangeListener implements DOMDataTreeChangeListener {
                 if(isOwner || wasOwner) {
                     Entity entity = createEntity(change.getRootPath());
 
-                    LOG.debug("Calling notifyEntityOwnershipListeners: entity: {}, wasOwner: {}, isOwner: {}",
-                            entity, wasOwner, isOwner);
+                    LOG.debug("{}: Calling notifyEntityOwnershipListeners: entity: {}, wasOwner: {}, isOwner: {}",
+                            logId(), entity, wasOwner, isOwner);
 
                     listenerSupport.notifyEntityOwnershipListeners(entity, wasOwner, isOwner);
                 }
@@ -87,26 +79,12 @@ class EntityOwnerChangeListener implements DOMDataTreeChangeListener {
         }
     }
 
-    private Entity createEntity(YangInstanceIdentifier entityPath) {
-        String entityType = null;
-        YangInstanceIdentifier entityId = null;
-        for(PathArgument pathArg: entityPath.getPathArguments()) {
-            if(pathArg instanceof NodeIdentifierWithPredicates) {
-                NodeIdentifierWithPredicates nodeKey = (NodeIdentifierWithPredicates) pathArg;
-                Entry<QName, Object> key = nodeKey.getKeyValues().entrySet().iterator().next();
-                if(ENTITY_TYPE_QNAME.equals(key.getKey())) {
-                    entityType = key.getValue().toString();
-                } else if(ENTITY_ID_QNAME.equals(key.getKey())) {
-                    entityId = (YangInstanceIdentifier) key.getValue();
-                }
-            }
-        }
-
-        return new Entity(entityType, entityId);
+    private String extractOwner(LeafNode<?> ownerLeaf) {
+        Object value = ownerLeaf.getValue();
+        return value != null ? value.toString() : null;
     }
 
-    private String extractOwner(MapEntryNode entityNode) {
-        Optional<DataContainerChild<? extends PathArgument, ?>> ownerNode = entityNode.getChild(ENTITY_OWNER_NODE_ID);
-        return ownerNode.isPresent() ? (String) ownerNode.get().getValue() : null;
+    private String logId() {
+        return listenerSupport.getLogId();
     }
 }
index 385bb70..1c97b48 100644 (file)
@@ -7,12 +7,16 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import java.util.Map.Entry;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -61,6 +65,15 @@ final class EntityOwnersModel {
 
     }
 
+    static YangInstanceIdentifier candidatePath(YangInstanceIdentifier entityPath, String candidateName) {
+        return YangInstanceIdentifier.builder(entityPath).node(Candidate.QNAME).nodeWithKey(
+                Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName).build();
+    }
+
+    static NodeIdentifierWithPredicates candidateNodeKey(String candidateName) {
+        return new NodeIdentifierWithPredicates(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName);
+    }
+
     static NormalizedNode<?, ?> entityOwnersWithCandidate(String entityType, YangInstanceIdentifier entityId,
             String candidateName) {
         return entityOwnersWithEntityTypeEntry(entityTypeEntryWithEntityEntry(entityType,
@@ -86,11 +99,33 @@ final class EntityOwnersModel {
 
     static MapNode candidateEntry(String candidateName) {
         return ImmutableOrderedMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(Candidate.QNAME)).
-                addChild(ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName)).build();
+                addChild(candidateMapEntry(candidateName)).build();
+    }
+
+    static MapEntryNode candidateMapEntry(String candidateName) {
+        return ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName);
     }
 
     static NormalizedNode<?, ?> entityEntryWithOwner(YangInstanceIdentifier entityId, String owner) {
         return ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID_QNAME, entityId).addChild(
                 ImmutableNodes.leafNode(ENTITY_OWNER_QNAME, owner)).build();
     }
+
+    static Entity createEntity(YangInstanceIdentifier entityPath) {
+        String entityType = null;
+        YangInstanceIdentifier entityId = null;
+        for(PathArgument pathArg: entityPath.getPathArguments()) {
+            if(pathArg instanceof NodeIdentifierWithPredicates) {
+                NodeIdentifierWithPredicates nodeKey = (NodeIdentifierWithPredicates) pathArg;
+                Entry<QName, Object> key = nodeKey.getKeyValues().entrySet().iterator().next();
+                if(ENTITY_TYPE_QNAME.equals(key.getKey())) {
+                    entityType = key.getValue().toString();
+                } else if(ENTITY_ID_QNAME.equals(key.getKey())) {
+                    entityId = (YangInstanceIdentifier) key.getValue();
+                }
+            }
+        }
+
+        return new Entity(entityType, entityId);
+    }
 }
index 7941bc0..ed4a004 100644 (file)
@@ -18,6 +18,7 @@ import java.util.IdentityHashMap;
 import java.util.Map;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,35 +31,51 @@ import org.slf4j.LoggerFactory;
 class EntityOwnershipListenerSupport {
     private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class);
 
+    private final String logId;
     private final ActorContext actorContext;
     private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
     private final Multimap<Entity, EntityOwnershipListener> entityListenerMap = HashMultimap.create();
     private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
 
-    EntityOwnershipListenerSupport(ActorContext actorContext) {
+    EntityOwnershipListenerSupport(ActorContext actorContext, String logId) {
         this.actorContext = actorContext;
+        this.logId = logId;
+    }
+
+    String getLogId() {
+        return logId;
+    }
+
+    boolean hasCandidateForEntity(Entity entity) {
+        for(EntityOwnershipListener listener: entityListenerMap.get(entity)) {
+            if(listener instanceof EntityOwnershipCandidate) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
-        LOG.debug("Adding EntityOwnershipListener {} for {}", listener, entity);
+        LOG.debug("{}: Adding EntityOwnershipListener {} for {}", logId, listener, entity);
 
         addListener(listener, entity, entityListenerMap);
     }
 
     void addEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
-        LOG.debug("Adding EntityOwnershipListener {} for entity type {}", listener, entityType);
+        LOG.debug("{}: Adding EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
 
         addListener(listener, entityType, entityTypeListenerMap);
     }
 
     void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
-        LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity);
+        LOG.debug("{}: Removing EntityOwnershipListener {} for {}", logId, listener, entity);
 
         removeListener(listener, entity, entityListenerMap);
     }
 
     void removeEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
-        LOG.debug("Removing EntityOwnershipListener {} for entity type {}", listener, entityType);
+        LOG.debug("{}: Removing EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
 
         removeListener(listener, entityType, entityTypeListenerMap);
     }
@@ -87,7 +104,7 @@ class EntityOwnershipListenerSupport {
         for(EntityOwnershipListener listener: listeners) {
             ActorRef listenerActor = listenerActorFor(listener);
 
-            LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor, changed);
+            LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed);
 
             listenerActor.tell(changed, ActorRef.noSender());
         }
@@ -110,7 +127,7 @@ class EntityOwnershipListenerSupport {
         if(fromListenerMap.remove(mapKey, listener)) {
             ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
 
-            LOG.debug("Found {}", listenerEntry);
+            LOG.debug("{}: Found {}", logId, listenerEntry);
 
             listenerEntry.referenceCount--;
             if(listenerEntry.referenceCount <= 0) {
@@ -136,7 +153,7 @@ class EntityOwnershipListenerSupport {
             if(actorRef == null) {
                 actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener));
 
-                LOG.debug("Created EntityOwnershipListenerActor {} for listener {}", actorRef, listener);
+                LOG.debug("{}: Created EntityOwnershipListenerActor {} for listener {}", logId, actorRef, listener);
             }
 
             return actorRef;
index 4dfbc87..58a7a03 100644 (file)
@@ -10,13 +10,18 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateMapEntry;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -80,7 +85,7 @@ class EntityOwnershipShard extends Shard {
         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
         this.localMemberName = localMemberName;
         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
-        this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
+        this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
 
         for(String peerId: peerAddresses.keySet()) {
             ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
@@ -97,7 +102,7 @@ class EntityOwnershipShard extends Shard {
     protected void onRecoveryComplete() {
         super.onRecoveryComplete();
 
-        new CandidateListChangeListener(getSelf()).init(getDataStore());
+        new CandidateListChangeListener(getSelf(), persistenceId()).init(getDataStore());
         new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
     }
 
@@ -236,15 +241,28 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void onCandidateRemoved(CandidateRemoved message) {
-        if(!isLeader()) {
-            return;
-        }
-
         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
 
-        String currentOwner = getCurrentOwner(message.getEntityPath());
-        if(message.getRemovedCandidate().equals(currentOwner)){
-            writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+        if(isLeader()) {
+            String currentOwner = getCurrentOwner(message.getEntityPath());
+            if(message.getRemovedCandidate().equals(currentOwner)){
+                writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+            }
+        } else {
+            // We're not the leader. If the removed candidate is our local member then check if we actually
+            // have a local candidate registered. If we do then we must have been partitioned from the leader
+            // and the leader removed our candidate since the leader can't tell the difference between a
+            // temporary network partition and a node's process actually restarted. So, in that case, re-add
+            // our candidate.
+            if(localMemberName.equals(message.getRemovedCandidate()) &&
+                    listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
+                LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
+                    " - adding back local candidate", message.getEntityPath());
+
+                commitCoordinator.commitModification(new MergeModification(
+                        candidatePath(message.getEntityPath(), localMemberName),
+                        candidateMapEntry(localMemberName)), this);
+            }
         }
     }
 
@@ -255,6 +273,10 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
 
+        // Since a node's candidate member is only added by the node itself, we can assume the node is up so
+        // remove it from the downPeerMemberNames.
+        downPeerMemberNames.remove(message.getNewCandidate());
+
         String currentOwner = getCurrentOwner(message.getEntityPath());
         if(Strings.isNullOrEmpty(currentOwner)){
             writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
@@ -262,12 +284,12 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void onPeerDown(PeerDown peerDown) {
-        LOG.debug("{}: onPeerDown: {}", persistenceId(), peerDown);
+        LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
 
         String downMemberName = peerDown.getMemberName();
         if(downPeerMemberNames.add(downMemberName) && isLeader()) {
-            // Select new owners for entities owned by the down peer.
-            selectNewOwnerForEntitiesOwnedBy(downMemberName);
+            // Remove the down peer as a candidate from all entities.
+            removeCandidateFromEntities(downMemberName);
         }
     }
 
@@ -275,13 +297,7 @@ class EntityOwnershipShard extends Shard {
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
         peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
-
-        if(downPeerMemberNames.remove(peerUp.getMemberName()) && isLeader()) {
-            // This peer was previously down - for its previously owned entities, if there were no other
-            // candidates, the owner would have been cleared so handle that here by trying to re-assign
-            // ownership for entities whose owner is cleared.
-            selectNewOwnerForEntitiesOwnedBy("");
-        }
+        downPeerMemberNames.remove(peerUp.getMemberName());
     }
 
     private void selectNewOwnerForEntitiesOwnedBy(String owner) {
@@ -304,7 +320,34 @@ class EntityOwnershipShard extends Shard {
         commitCoordinator.commitModifications(modifications, this);
     }
 
-    private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) {
+    private void removeCandidateFromEntities(final String owner) {
+        final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntities(new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                if(hasCandidate(entityNode, owner)) {
+                    YangInstanceIdentifier entityId =
+                            (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+                    YangInstanceIdentifier candidatePath = candidatePath(
+                            entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
+                            entityId, owner);
+
+                    LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
+                            owner, candidatePath);
+
+                    modifications.addModification(new DeleteModification(candidatePath));
+                }
+            }
+        });
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
+    private boolean hasCandidate(MapEntryNode entity, String candidateName) {
+        return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName)).isPresent();
+    }
+
+    private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
         Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
         if(!possibleEntityTypes.isPresent()) {
@@ -313,6 +356,25 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
 
+        searchForEntities(new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+                        entityNode.getChild(ENTITY_OWNER_NODE_ID);
+                if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+                    walker.onEntity(entityTypeNode, entityNode);
+                }
+            }
+        });
+    }
+
+    private void searchForEntities(EntityWalker walker) {
+        DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+        Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
+        if(!possibleEntityTypes.isPresent()) {
+            return;
+        }
+
         for(MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
                     entityType.getChild(ENTITY_NODE_ID);
@@ -321,11 +383,7 @@ class EntityOwnershipShard extends Shard {
             }
 
             for(MapEntryNode entity:  ((MapNode) possibleEntities.get()).getValue()) {
-                Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
-                        entity.getChild(ENTITY_OWNER_NODE_ID);
-                if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
-                    walker.onEntity(entityType, entity);
-                }
+                walker.onEntity(entityType, entity);
             }
         }
     }
index 09cae67..3134a6e 100644 (file)
@@ -51,7 +51,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed
  */
 public class AbstractEntityOwnershipTest extends AbstractActorTest {
     protected void verifyEntityCandidate(NormalizedNode<?, ?> node, String entityType,
-            YangInstanceIdentifier entityId, String candidateName) {
+            YangInstanceIdentifier entityId, String candidateName, boolean expectPresent) {
         try {
             assertNotNull("Missing " + EntityOwners.QNAME.toString(), node);
             assertTrue(node instanceof ContainerNode);
@@ -59,24 +59,25 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
             ContainerNode entityOwnersNode = (ContainerNode) node;
 
             MapEntryNode entityTypeEntry = getMapEntryNodeChild(entityOwnersNode, EntityType.QNAME,
-                    ENTITY_TYPE_QNAME, entityType);
+                    ENTITY_TYPE_QNAME, entityType, true);
 
-            MapEntryNode entityEntry = getMapEntryNodeChild(entityTypeEntry, ENTITY_QNAME, ENTITY_ID_QNAME, entityId);
+            MapEntryNode entityEntry = getMapEntryNodeChild(entityTypeEntry, ENTITY_QNAME, ENTITY_ID_QNAME,
+                    entityId, true);
 
-            getMapEntryNodeChild(entityEntry, Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName);
+            getMapEntryNodeChild(entityEntry, Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName, expectPresent);
         } catch(AssertionError e) {
             throw new AssertionError("Verification of entity candidate failed - returned data was: " + node, e);
         }
     }
 
     protected void verifyEntityCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName,
-            Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) {
+            Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader, boolean expectPresent) {
         AssertionError lastError = null;
         Stopwatch sw = Stopwatch.createStarted();
         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
             NormalizedNode<?, ?> node = reader.apply(ENTITY_OWNERS_PATH);
             try {
-                verifyEntityCandidate(node, entityType, entityId, candidateName);
+                verifyEntityCandidate(node, entityType, entityId, candidateName, expectPresent);
                 return;
             } catch (AssertionError e) {
                 lastError = e;
@@ -87,8 +88,13 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
         throw lastError;
     }
 
+    protected void verifyEntityCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName,
+            Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) {
+        verifyEntityCandidate(entityType, entityId, candidateName, reader, true);
+    }
+
     protected MapEntryNode getMapEntryNodeChild(DataContainerNode<? extends PathArgument> parent, QName childMap,
-            QName child, Object key) {
+            QName child, Object key, boolean expectPresent) {
         Optional<DataContainerChild<? extends PathArgument, ?>> childNode =
                 parent.getChild(new NodeIdentifier(childMap));
         assertEquals("Missing " + childMap.toString(), true, childNode.isPresent());
@@ -96,10 +102,13 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
         MapNode entityTypeMapNode = (MapNode) childNode.get();
         Optional<MapEntryNode> entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates(
                 childMap, child, key));
-        if(!entityTypeEntry.isPresent()) {
+        if(expectPresent && !entityTypeEntry.isPresent()) {
             fail("Missing " + childMap.toString() + " entry for " + key + ". Actual: " + entityTypeMapNode.getValue());
+        } else if(!expectPresent && entityTypeEntry.isPresent()) {
+            fail("Found unexpected " + childMap.toString() + " entry for " + key);
         }
-        return entityTypeEntry.get();
+
+        return entityTypeEntry.isPresent() ? entityTypeEntry.get() : null;
     }
 
     static void verifyOwner(String expected, String entityType, YangInstanceIdentifier entityId,
index aa96fc2..7516242 100644 (file)
@@ -45,7 +45,7 @@ public class CandidateListChangeListenerTest extends AbstractActorTest {
     public void testOnDataTreeChanged() throws Exception {
         JavaTestKit kit = new JavaTestKit(getSystem());
 
-        new CandidateListChangeListener(kit.getRef()).init(shardDataTree);
+        new CandidateListChangeListener(kit.getRef(), "test").init(shardDataTree);
 
         String memberName1 = "member-1";
         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName1));
index 9bd0ccf..8b80d6b 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+;
 
 /**
  * Unit tests for EntityOwnerChangeListener.
@@ -85,9 +86,21 @@ public class EntityOwnerChangeListenerTest {
         reset(mockListenerSupport);
         writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, LOCAL_MEMBER_NAME));
         verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY2, false, true);
+
+        reset(mockListenerSupport);
+        writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, LOCAL_MEMBER_NAME));
+        verify(mockListenerSupport, never()).notifyEntityOwnershipListeners(any(Entity.class), anyBoolean(), anyBoolean());
+
+        reset(mockListenerSupport);
+        writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, null));
+        verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY2, true, false);
     }
 
     private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
         AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
     }
+
+    private void deleteNode(YangInstanceIdentifier path) throws DataValidationFailedException {
+        AbstractEntityOwnershipTest.deleteNode(path, shardDataTree);
+    }
 }
index 8ddc0b4..0044875 100644 (file)
@@ -15,9 +15,9 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
+import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.actor.UntypedActorContext;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -25,11 +25,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -43,6 +45,15 @@ import scala.collection.immutable.Iterable;
  */
 public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+    private ActorContext actorContext;
+
+    @Before
+    public void setup() {
+        TestActorRef<DoNothingActor> actor = actorFactory.createTestActor(
+                Props.create(DoNothingActor.class), actorFactory.generateActorId("test"));
+
+        actorContext = actor.underlyingActor().getContext();
+    }
 
     @After
     public void tearDown() {
@@ -51,11 +62,7 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
 
     @Test
     public void testNotifyEntityOwnershipListeners() {
-        TestActorRef<DoNothingActor> actor = actorFactory.createTestActor(
-                Props.create(DoNothingActor.class), actorFactory.generateActorId("test"));
-
-        UntypedActorContext actorContext = actor.underlyingActor().getContext();
-        EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext);
+        EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext, "test");
 
         EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1");
         EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2");
@@ -184,4 +191,22 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
         support.addEntityOwnershipListener(entity1, mockListener2);
         support.removeEntityOwnershipListener(entity1, mockListener2);
     }
+
+    @Test
+    public void testHasCandidateForEntity() {
+        EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext, "test");
+        Entity entity = new Entity("type", YangInstanceIdentifier.of(QName.create("test", "id")));
+
+        assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
+
+        support.addEntityOwnershipListener(entity, mock(EntityOwnershipListener.class));
+        assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
+
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+        support.addEntityOwnershipListener(entity, candidate);
+        assertEquals("hasCandidateForEntity", true, support.hasCandidateForEntity(entity));
+
+        support.removeEntityOwnershipListener(entity, candidate);
+        assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
+    }
 }
index 146916a..2dc7544 100644 (file)
@@ -93,6 +93,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
     private static final YangInstanceIdentifier ENTITY_ID4 =
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
+    private static final YangInstanceIdentifier ENTITY_ID5 =
+            YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
     private static final String LOCAL_MEMBER_NAME = "member-1";
@@ -480,10 +482,6 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
-
-        leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
-        kit.expectMsgClass(SuccessReply.class);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
 
         // Add candidates for entity3 with peerMember2 as the owner.
@@ -505,6 +503,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
 
+        // Add only candidate peerMember1 for entity5.
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
+
         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
         // owner selected
 
@@ -523,12 +527,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
 
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
 
-        // Reinstate peerMember2 - should become owner again for entity 4
+        // Reinstate peerMember2 - no owners should change
 
         peer2 = actorFactory.createTestActor(newShardProps(peerId2,
                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
@@ -538,10 +542,22 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
         peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
 
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
+
+        // Add back candidate peerMember2 for entities 1, 2, & 3.
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
 
         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
 
@@ -552,7 +568,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         // Verify the reinstated peerMember2 is fully synced.
 
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
@@ -564,14 +580,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
                         peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
         leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
 
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
 
         // Verify the reinstated peerMember1 is fully synced.
 
-        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
@@ -589,12 +605,67 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         kit.waitUntilLeader(peer2);
 
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
     }
 
+    @Test
+    public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
+        ShardIdentifier leaderId = newShardId("leader");
+        ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
+
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
+                TestEntityOwnershipShard.class, localId,
+                ImmutableMap.<String, String>builder().put(leaderId.toString(), "".toString()).build(),
+                dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
+                ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
+                    LOCAL_MEMBER_NAME).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
+        leader.tell(new ElectionTimeout(), leader);
+
+        kit.waitUntilLeader(leader);
+
+        shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
+
+        Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        // Register local candidate
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+        verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+        reset(candidate);
+
+        // Simulate a replicated commit from the leader to remove the local candidate that would occur after a
+        // network partition is healed.
+
+        leader.tell(new PeerDown(LOCAL_MEMBER_NAME, localId.toString()), ActorRef.noSender());
+
+        verify(candidate, timeout(5000)).ownershipChanged(entity, true, false);
+
+        // Since the the shard has a local candidate registered, it should re-add its candidate to the entity.
+
+        verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+        verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+
+        // Unregister the local candidate and verify it's removed and no re-added.
+
+        shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+    }
+
     @Test
     public void testListenerRegistration() throws Exception {
         ShardTestKit kit = new ShardTestKit(getSystem());
@@ -681,16 +752,21 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
     private void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
             JavaTestKit sender) {
-        BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
-        modifications.setDoCommitOnReady(true);
-        modifications.setReady(true);
-        modifications.setTotalMessagesSent(1);
+        BatchedModifications modifications = newBatchedModifications();
         modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node));
 
         shard.tell(modifications, sender.getRef());
         sender.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
+    private BatchedModifications newBatchedModifications() {
+        BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
+        modifications.setDoCommitOnReady(true);
+        modifications.setReady(true);
+        modifications.setTotalMessagesSent(1);
+        return modifications;
+    }
+
     private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
             YangInstanceIdentifier entityId, String candidateName) {
         verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
@@ -720,6 +796,20 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         });
     }
 
+    private void verifyNoEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) {
+        verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+            @Override
+            public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+                try {
+                    return AbstractShardTest.readStore(shard, path);
+                } catch(Exception e) {
+                    throw new AssertionError("Failed to read " + path, e);
+                }
+            }
+        }, false);
+    }
+
     private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
             YangInstanceIdentifier entityId, String candidateName) throws Exception {
         assertEquals("BatchedModifications size", 1, mods.size());
@@ -730,7 +820,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
             YangInstanceIdentifier entityId, String candidateName) throws Exception {
         assertEquals("Modification type", MergeModification.class, mod.getClass());
         verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
-                entityId, candidateName);
+                entityId, candidateName, true);
     }
 
     private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,