Bug 4105: Remove candidates on PeerDown 08/26808/1
authorTom Pantelis <tpanteli@brocade.com>
Tue, 18 Aug 2015 06:59:47 +0000 (02:59 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 19:21:17 +0000 (15:21 -0400)
Currently on PeerDown, the EntityOwnershipShard selects a new owner for
the entities owned by the down node and leaves the down node as a
candidate. If the down node is the only candidate, the owner is cleared.
On PeerUp, it selects a new owner for those entities whose owner is clear.
This was done to handle network partition so a node's candidates remain
registered and are re-assigned when the partition is healed.

Howver this has potential issues when a node is actually
stopped/restarted. It's possible, on restart, that the node doesn't
register a candidate for an entity that it had previously registered for.
So it may get ownership of an entity for which it has no registered
candidate.

To alleviate this, I changed it to remove all the down node's candidates
on PeerDown. If the node was stopped/restarted, then it will
re-register candidates based on local client requests. This case will be
the norm. To handle network partition, when healed, the follower node
will get the replicated commits for its candidate removals from the
leader. So on Candidate removed, it re-adds its removed candidate if it
has a registered EntityOwnershipCandidate.

I realized that one can register a DOMDataTreeChangeListener for a leaf
node. So I simplified EntityOwnerChangeListener to listen for the owner
leaf instead of the entity path. This avoids the extra notifications
when candidayes are added/removed. I actually did this originally b/c I
thought there was a bug when listening at the entity level which turned
out there wasn't but I left it in as an improvement.

I also added the shard's logId to the listener and support classes for
better debugging of unit tests.

Change-Id: I75d2567ce54b9129eee052ba521c8a71777289b6
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java

index 82f926aea9795969073c9fa7f935d0062548aa0f..bd41ebe94292a00d13097fc86251f39edead582e 100644 (file)
@@ -47,11 +47,13 @@ import org.slf4j.LoggerFactory;
 class CandidateListChangeListener implements DOMDataTreeChangeListener {
     private static final Logger LOG = LoggerFactory.getLogger(CandidateListChangeListener.class);
 
 class CandidateListChangeListener implements DOMDataTreeChangeListener {
     private static final Logger LOG = LoggerFactory.getLogger(CandidateListChangeListener.class);
 
+    private final String logId;
     private final ActorRef shard;
     private final Map<YangInstanceIdentifier, Collection<String>> currentCandidates = new HashMap<>();
 
     private final ActorRef shard;
     private final Map<YangInstanceIdentifier, Collection<String>> currentCandidates = new HashMap<>();
 
-    CandidateListChangeListener(ActorRef shard) {
+    CandidateListChangeListener(ActorRef shard, String logId) {
         this.shard = Preconditions.checkNotNull(shard, "shard should not be null");
         this.shard = Preconditions.checkNotNull(shard, "shard should not be null");
+        this.logId = logId;
     }
 
     void init(ShardDataTree shardDataTree) {
     }
 
     void init(ShardDataTree shardDataTree) {
@@ -65,7 +67,7 @@ class CandidateListChangeListener implements DOMDataTreeChangeListener {
         for(DataTreeCandidate change: changes) {
             DataTreeCandidateNode changeRoot = change.getRootNode();
 
         for(DataTreeCandidate change: changes) {
             DataTreeCandidateNode changeRoot = change.getRootNode();
 
-            LOG.debug("Candidate node changed: {}, {}", changeRoot.getModificationType(), change.getRootPath());
+            LOG.debug("{}: Candidate node changed: {}, {}", logId, changeRoot.getModificationType(), change.getRootPath());
 
             NodeIdentifierWithPredicates candidateKey =
                     (NodeIdentifierWithPredicates) change.getRootPath().getLastPathArgument();
 
             NodeIdentifierWithPredicates candidateKey =
                     (NodeIdentifierWithPredicates) change.getRootPath().getLastPathArgument();
@@ -74,12 +76,12 @@ class CandidateListChangeListener implements DOMDataTreeChangeListener {
             YangInstanceIdentifier entityId = extractEntityPath(change.getRootPath());
 
             if(changeRoot.getModificationType() == ModificationType.WRITE) {
             YangInstanceIdentifier entityId = extractEntityPath(change.getRootPath());
 
             if(changeRoot.getModificationType() == ModificationType.WRITE) {
-                LOG.debug("Candidate {} was added for entity {}", candidate, entityId);
+                LOG.debug("{}: Candidate {} was added for entity {}", logId, candidate, entityId);
 
                 Collection<String> currentCandidates = addToCurrentCandidates(entityId, candidate);
                 shard.tell(new CandidateAdded(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
             } else if(changeRoot.getModificationType() == ModificationType.DELETE) {
 
                 Collection<String> currentCandidates = addToCurrentCandidates(entityId, candidate);
                 shard.tell(new CandidateAdded(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
             } else if(changeRoot.getModificationType() == ModificationType.DELETE) {
-                LOG.debug("Candidate {} was removed for entity {}", candidate, entityId);
+                LOG.debug("{}: Candidate {} was removed for entity {}", logId, candidate, entityId);
 
                 Collection<String> currentCandidates = removeFromCurrentCandidates(entityId, candidate);
                 shard.tell(new CandidateRemoved(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
 
                 Collection<String> currentCandidates = removeFromCurrentCandidates(entityId, candidate);
                 shard.tell(new CandidateRemoved(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
index 9c551f257657489055918505a86b5ca45ab90895..cf9df1821a93bab26787e10941c42e870888b17f 100644 (file)
@@ -7,25 +7,18 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import java.util.Collection;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import java.util.Collection;
-import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
@@ -50,27 +43,26 @@ class EntityOwnerChangeListener implements DOMDataTreeChangeListener {
 
     void init(ShardDataTree shardDataTree) {
         shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH).
 
     void init(ShardDataTree shardDataTree) {
         shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH).
-                node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME).build(), this);
+                node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME).node(EntityOwnersModel.ENTITY_OWNER_QNAME).build(), this);
     }
 
     @Override
     public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
         for(DataTreeCandidate change: changes) {
             DataTreeCandidateNode changeRoot = change.getRootNode();
     }
 
     @Override
     public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
         for(DataTreeCandidate change: changes) {
             DataTreeCandidateNode changeRoot = change.getRootNode();
-            MapEntryNode entityNode = (MapEntryNode) changeRoot.getDataAfter().get();
+            LeafNode<?> ownerLeaf = (LeafNode<?>) changeRoot.getDataAfter().get();
 
 
-            LOG.debug("Entity node changed: {}, {}", changeRoot.getModificationType(), change.getRootPath());
+            LOG.debug("{}: Entity node changed: {}, {}", logId(), changeRoot.getModificationType(), change.getRootPath());
 
 
-            String newOwner = extractOwner(entityNode);
+            String newOwner = extractOwner(ownerLeaf);
 
             String origOwner = null;
             Optional<NormalizedNode<?, ?>> dataBefore = changeRoot.getDataBefore();
             if(dataBefore.isPresent()) {
 
             String origOwner = null;
             Optional<NormalizedNode<?, ?>> dataBefore = changeRoot.getDataBefore();
             if(dataBefore.isPresent()) {
-                MapEntryNode origEntityNode = (MapEntryNode) changeRoot.getDataBefore().get();
-                origOwner = extractOwner(origEntityNode);
+                origOwner = extractOwner((LeafNode<?>) changeRoot.getDataBefore().get());
             }
 
             }
 
-            LOG.debug("New owner: {}, Original owner: {}", newOwner, origOwner);
+            LOG.debug("{}: New owner: {}, Original owner: {}", logId(), newOwner, origOwner);
 
             if(!Objects.equal(origOwner, newOwner)) {
                 boolean isOwner = Objects.equal(localMemberName, newOwner);
 
             if(!Objects.equal(origOwner, newOwner)) {
                 boolean isOwner = Objects.equal(localMemberName, newOwner);
@@ -78,8 +70,8 @@ class EntityOwnerChangeListener implements DOMDataTreeChangeListener {
                 if(isOwner || wasOwner) {
                     Entity entity = createEntity(change.getRootPath());
 
                 if(isOwner || wasOwner) {
                     Entity entity = createEntity(change.getRootPath());
 
-                    LOG.debug("Calling notifyEntityOwnershipListeners: entity: {}, wasOwner: {}, isOwner: {}",
-                            entity, wasOwner, isOwner);
+                    LOG.debug("{}: Calling notifyEntityOwnershipListeners: entity: {}, wasOwner: {}, isOwner: {}",
+                            logId(), entity, wasOwner, isOwner);
 
                     listenerSupport.notifyEntityOwnershipListeners(entity, wasOwner, isOwner);
                 }
 
                     listenerSupport.notifyEntityOwnershipListeners(entity, wasOwner, isOwner);
                 }
@@ -87,26 +79,12 @@ class EntityOwnerChangeListener implements DOMDataTreeChangeListener {
         }
     }
 
         }
     }
 
-    private Entity createEntity(YangInstanceIdentifier entityPath) {
-        String entityType = null;
-        YangInstanceIdentifier entityId = null;
-        for(PathArgument pathArg: entityPath.getPathArguments()) {
-            if(pathArg instanceof NodeIdentifierWithPredicates) {
-                NodeIdentifierWithPredicates nodeKey = (NodeIdentifierWithPredicates) pathArg;
-                Entry<QName, Object> key = nodeKey.getKeyValues().entrySet().iterator().next();
-                if(ENTITY_TYPE_QNAME.equals(key.getKey())) {
-                    entityType = key.getValue().toString();
-                } else if(ENTITY_ID_QNAME.equals(key.getKey())) {
-                    entityId = (YangInstanceIdentifier) key.getValue();
-                }
-            }
-        }
-
-        return new Entity(entityType, entityId);
+    private String extractOwner(LeafNode<?> ownerLeaf) {
+        Object value = ownerLeaf.getValue();
+        return value != null ? value.toString() : null;
     }
 
     }
 
-    private String extractOwner(MapEntryNode entityNode) {
-        Optional<DataContainerChild<? extends PathArgument, ?>> ownerNode = entityNode.getChild(ENTITY_OWNER_NODE_ID);
-        return ownerNode.isPresent() ? (String) ownerNode.get().getValue() : null;
+    private String logId() {
+        return listenerSupport.getLogId();
     }
 }
     }
 }
index 385bb706491d838084b6fc7e04d4035f0e5449fd..1c97b4818b03bdce1b41e34aa21819d5ac461078 100644 (file)
@@ -7,12 +7,16 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import java.util.Map.Entry;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -61,6 +65,15 @@ final class EntityOwnersModel {
 
     }
 
 
     }
 
+    static YangInstanceIdentifier candidatePath(YangInstanceIdentifier entityPath, String candidateName) {
+        return YangInstanceIdentifier.builder(entityPath).node(Candidate.QNAME).nodeWithKey(
+                Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName).build();
+    }
+
+    static NodeIdentifierWithPredicates candidateNodeKey(String candidateName) {
+        return new NodeIdentifierWithPredicates(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName);
+    }
+
     static NormalizedNode<?, ?> entityOwnersWithCandidate(String entityType, YangInstanceIdentifier entityId,
             String candidateName) {
         return entityOwnersWithEntityTypeEntry(entityTypeEntryWithEntityEntry(entityType,
     static NormalizedNode<?, ?> entityOwnersWithCandidate(String entityType, YangInstanceIdentifier entityId,
             String candidateName) {
         return entityOwnersWithEntityTypeEntry(entityTypeEntryWithEntityEntry(entityType,
@@ -86,11 +99,33 @@ final class EntityOwnersModel {
 
     static MapNode candidateEntry(String candidateName) {
         return ImmutableOrderedMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(Candidate.QNAME)).
 
     static MapNode candidateEntry(String candidateName) {
         return ImmutableOrderedMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(Candidate.QNAME)).
-                addChild(ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName)).build();
+                addChild(candidateMapEntry(candidateName)).build();
+    }
+
+    static MapEntryNode candidateMapEntry(String candidateName) {
+        return ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName);
     }
 
     static NormalizedNode<?, ?> entityEntryWithOwner(YangInstanceIdentifier entityId, String owner) {
         return ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID_QNAME, entityId).addChild(
                 ImmutableNodes.leafNode(ENTITY_OWNER_QNAME, owner)).build();
     }
     }
 
     static NormalizedNode<?, ?> entityEntryWithOwner(YangInstanceIdentifier entityId, String owner) {
         return ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID_QNAME, entityId).addChild(
                 ImmutableNodes.leafNode(ENTITY_OWNER_QNAME, owner)).build();
     }
+
+    static Entity createEntity(YangInstanceIdentifier entityPath) {
+        String entityType = null;
+        YangInstanceIdentifier entityId = null;
+        for(PathArgument pathArg: entityPath.getPathArguments()) {
+            if(pathArg instanceof NodeIdentifierWithPredicates) {
+                NodeIdentifierWithPredicates nodeKey = (NodeIdentifierWithPredicates) pathArg;
+                Entry<QName, Object> key = nodeKey.getKeyValues().entrySet().iterator().next();
+                if(ENTITY_TYPE_QNAME.equals(key.getKey())) {
+                    entityType = key.getValue().toString();
+                } else if(ENTITY_ID_QNAME.equals(key.getKey())) {
+                    entityId = (YangInstanceIdentifier) key.getValue();
+                }
+            }
+        }
+
+        return new Entity(entityType, entityId);
+    }
 }
 }
index 7941bc088c9d8a700e6d1ac904ada4d16433b2fb..ed4a004ed046b72fd83ef17a23dfc0552f8d1512 100644 (file)
@@ -18,6 +18,7 @@ import java.util.IdentityHashMap;
 import java.util.Map;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import java.util.Map;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,35 +31,51 @@ import org.slf4j.LoggerFactory;
 class EntityOwnershipListenerSupport {
     private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class);
 
 class EntityOwnershipListenerSupport {
     private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class);
 
+    private final String logId;
     private final ActorContext actorContext;
     private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
     private final Multimap<Entity, EntityOwnershipListener> entityListenerMap = HashMultimap.create();
     private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
 
     private final ActorContext actorContext;
     private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
     private final Multimap<Entity, EntityOwnershipListener> entityListenerMap = HashMultimap.create();
     private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
 
-    EntityOwnershipListenerSupport(ActorContext actorContext) {
+    EntityOwnershipListenerSupport(ActorContext actorContext, String logId) {
         this.actorContext = actorContext;
         this.actorContext = actorContext;
+        this.logId = logId;
+    }
+
+    String getLogId() {
+        return logId;
+    }
+
+    boolean hasCandidateForEntity(Entity entity) {
+        for(EntityOwnershipListener listener: entityListenerMap.get(entity)) {
+            if(listener instanceof EntityOwnershipCandidate) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
     }
 
     void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
-        LOG.debug("Adding EntityOwnershipListener {} for {}", listener, entity);
+        LOG.debug("{}: Adding EntityOwnershipListener {} for {}", logId, listener, entity);
 
         addListener(listener, entity, entityListenerMap);
     }
 
     void addEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
 
         addListener(listener, entity, entityListenerMap);
     }
 
     void addEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
-        LOG.debug("Adding EntityOwnershipListener {} for entity type {}", listener, entityType);
+        LOG.debug("{}: Adding EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
 
         addListener(listener, entityType, entityTypeListenerMap);
     }
 
     void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
 
         addListener(listener, entityType, entityTypeListenerMap);
     }
 
     void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
-        LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity);
+        LOG.debug("{}: Removing EntityOwnershipListener {} for {}", logId, listener, entity);
 
         removeListener(listener, entity, entityListenerMap);
     }
 
     void removeEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
 
         removeListener(listener, entity, entityListenerMap);
     }
 
     void removeEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
-        LOG.debug("Removing EntityOwnershipListener {} for entity type {}", listener, entityType);
+        LOG.debug("{}: Removing EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
 
         removeListener(listener, entityType, entityTypeListenerMap);
     }
 
         removeListener(listener, entityType, entityTypeListenerMap);
     }
@@ -87,7 +104,7 @@ class EntityOwnershipListenerSupport {
         for(EntityOwnershipListener listener: listeners) {
             ActorRef listenerActor = listenerActorFor(listener);
 
         for(EntityOwnershipListener listener: listeners) {
             ActorRef listenerActor = listenerActorFor(listener);
 
-            LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor, changed);
+            LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed);
 
             listenerActor.tell(changed, ActorRef.noSender());
         }
 
             listenerActor.tell(changed, ActorRef.noSender());
         }
@@ -110,7 +127,7 @@ class EntityOwnershipListenerSupport {
         if(fromListenerMap.remove(mapKey, listener)) {
             ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
 
         if(fromListenerMap.remove(mapKey, listener)) {
             ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
 
-            LOG.debug("Found {}", listenerEntry);
+            LOG.debug("{}: Found {}", logId, listenerEntry);
 
             listenerEntry.referenceCount--;
             if(listenerEntry.referenceCount <= 0) {
 
             listenerEntry.referenceCount--;
             if(listenerEntry.referenceCount <= 0) {
@@ -136,7 +153,7 @@ class EntityOwnershipListenerSupport {
             if(actorRef == null) {
                 actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener));
 
             if(actorRef == null) {
                 actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener));
 
-                LOG.debug("Created EntityOwnershipListenerActor {} for listener {}", actorRef, listener);
+                LOG.debug("{}: Created EntityOwnershipListenerActor {} for listener {}", logId, actorRef, listener);
             }
 
             return actorRef;
             }
 
             return actorRef;
index 4dfbc87eb9a2b1b7bcb3545e1e67dc2c03304c3a..58a7a036759dc6fa9de884a22449563f53c14888 100644 (file)
@@ -10,13 +10,18 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateMapEntry;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -80,7 +85,7 @@ class EntityOwnershipShard extends Shard {
         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
         this.localMemberName = localMemberName;
         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
         this.localMemberName = localMemberName;
         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
-        this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
+        this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
 
         for(String peerId: peerAddresses.keySet()) {
             ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
 
         for(String peerId: peerAddresses.keySet()) {
             ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
@@ -97,7 +102,7 @@ class EntityOwnershipShard extends Shard {
     protected void onRecoveryComplete() {
         super.onRecoveryComplete();
 
     protected void onRecoveryComplete() {
         super.onRecoveryComplete();
 
-        new CandidateListChangeListener(getSelf()).init(getDataStore());
+        new CandidateListChangeListener(getSelf(), persistenceId()).init(getDataStore());
         new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
     }
 
         new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
     }
 
@@ -236,15 +241,28 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void onCandidateRemoved(CandidateRemoved message) {
     }
 
     private void onCandidateRemoved(CandidateRemoved message) {
-        if(!isLeader()) {
-            return;
-        }
-
         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
 
         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
 
-        String currentOwner = getCurrentOwner(message.getEntityPath());
-        if(message.getRemovedCandidate().equals(currentOwner)){
-            writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+        if(isLeader()) {
+            String currentOwner = getCurrentOwner(message.getEntityPath());
+            if(message.getRemovedCandidate().equals(currentOwner)){
+                writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+            }
+        } else {
+            // We're not the leader. If the removed candidate is our local member then check if we actually
+            // have a local candidate registered. If we do then we must have been partitioned from the leader
+            // and the leader removed our candidate since the leader can't tell the difference between a
+            // temporary network partition and a node's process actually restarted. So, in that case, re-add
+            // our candidate.
+            if(localMemberName.equals(message.getRemovedCandidate()) &&
+                    listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
+                LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
+                    " - adding back local candidate", message.getEntityPath());
+
+                commitCoordinator.commitModification(new MergeModification(
+                        candidatePath(message.getEntityPath(), localMemberName),
+                        candidateMapEntry(localMemberName)), this);
+            }
         }
     }
 
         }
     }
 
@@ -255,6 +273,10 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
 
 
         LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
 
+        // Since a node's candidate member is only added by the node itself, we can assume the node is up so
+        // remove it from the downPeerMemberNames.
+        downPeerMemberNames.remove(message.getNewCandidate());
+
         String currentOwner = getCurrentOwner(message.getEntityPath());
         if(Strings.isNullOrEmpty(currentOwner)){
             writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
         String currentOwner = getCurrentOwner(message.getEntityPath());
         if(Strings.isNullOrEmpty(currentOwner)){
             writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
@@ -262,12 +284,12 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void onPeerDown(PeerDown peerDown) {
     }
 
     private void onPeerDown(PeerDown peerDown) {
-        LOG.debug("{}: onPeerDown: {}", persistenceId(), peerDown);
+        LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
 
         String downMemberName = peerDown.getMemberName();
         if(downPeerMemberNames.add(downMemberName) && isLeader()) {
 
         String downMemberName = peerDown.getMemberName();
         if(downPeerMemberNames.add(downMemberName) && isLeader()) {
-            // Select new owners for entities owned by the down peer.
-            selectNewOwnerForEntitiesOwnedBy(downMemberName);
+            // Remove the down peer as a candidate from all entities.
+            removeCandidateFromEntities(downMemberName);
         }
     }
 
         }
     }
 
@@ -275,13 +297,7 @@ class EntityOwnershipShard extends Shard {
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
         peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
         peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
-
-        if(downPeerMemberNames.remove(peerUp.getMemberName()) && isLeader()) {
-            // This peer was previously down - for its previously owned entities, if there were no other
-            // candidates, the owner would have been cleared so handle that here by trying to re-assign
-            // ownership for entities whose owner is cleared.
-            selectNewOwnerForEntitiesOwnedBy("");
-        }
+        downPeerMemberNames.remove(peerUp.getMemberName());
     }
 
     private void selectNewOwnerForEntitiesOwnedBy(String owner) {
     }
 
     private void selectNewOwnerForEntitiesOwnedBy(String owner) {
@@ -304,7 +320,34 @@ class EntityOwnershipShard extends Shard {
         commitCoordinator.commitModifications(modifications, this);
     }
 
         commitCoordinator.commitModifications(modifications, this);
     }
 
-    private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) {
+    private void removeCandidateFromEntities(final String owner) {
+        final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntities(new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                if(hasCandidate(entityNode, owner)) {
+                    YangInstanceIdentifier entityId =
+                            (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+                    YangInstanceIdentifier candidatePath = candidatePath(
+                            entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
+                            entityId, owner);
+
+                    LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
+                            owner, candidatePath);
+
+                    modifications.addModification(new DeleteModification(candidatePath));
+                }
+            }
+        });
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
+    private boolean hasCandidate(MapEntryNode entity, String candidateName) {
+        return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName)).isPresent();
+    }
+
+    private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
         Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
         if(!possibleEntityTypes.isPresent()) {
         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
         Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
         if(!possibleEntityTypes.isPresent()) {
@@ -313,6 +356,25 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
 
 
         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
 
+        searchForEntities(new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+                        entityNode.getChild(ENTITY_OWNER_NODE_ID);
+                if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+                    walker.onEntity(entityTypeNode, entityNode);
+                }
+            }
+        });
+    }
+
+    private void searchForEntities(EntityWalker walker) {
+        DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+        Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
+        if(!possibleEntityTypes.isPresent()) {
+            return;
+        }
+
         for(MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
                     entityType.getChild(ENTITY_NODE_ID);
         for(MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
                     entityType.getChild(ENTITY_NODE_ID);
@@ -321,11 +383,7 @@ class EntityOwnershipShard extends Shard {
             }
 
             for(MapEntryNode entity:  ((MapNode) possibleEntities.get()).getValue()) {
             }
 
             for(MapEntryNode entity:  ((MapNode) possibleEntities.get()).getValue()) {
-                Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
-                        entity.getChild(ENTITY_OWNER_NODE_ID);
-                if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
-                    walker.onEntity(entityType, entity);
-                }
+                walker.onEntity(entityType, entity);
             }
         }
     }
             }
         }
     }
index 09cae67e7d0d00b99bbab89b4fad0f93fea36039..3134a6eedd4a540721cc8c03b88d93411585b092 100644 (file)
@@ -51,7 +51,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed
  */
 public class AbstractEntityOwnershipTest extends AbstractActorTest {
     protected void verifyEntityCandidate(NormalizedNode<?, ?> node, String entityType,
  */
 public class AbstractEntityOwnershipTest extends AbstractActorTest {
     protected void verifyEntityCandidate(NormalizedNode<?, ?> node, String entityType,
-            YangInstanceIdentifier entityId, String candidateName) {
+            YangInstanceIdentifier entityId, String candidateName, boolean expectPresent) {
         try {
             assertNotNull("Missing " + EntityOwners.QNAME.toString(), node);
             assertTrue(node instanceof ContainerNode);
         try {
             assertNotNull("Missing " + EntityOwners.QNAME.toString(), node);
             assertTrue(node instanceof ContainerNode);
@@ -59,24 +59,25 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
             ContainerNode entityOwnersNode = (ContainerNode) node;
 
             MapEntryNode entityTypeEntry = getMapEntryNodeChild(entityOwnersNode, EntityType.QNAME,
             ContainerNode entityOwnersNode = (ContainerNode) node;
 
             MapEntryNode entityTypeEntry = getMapEntryNodeChild(entityOwnersNode, EntityType.QNAME,
-                    ENTITY_TYPE_QNAME, entityType);
+                    ENTITY_TYPE_QNAME, entityType, true);
 
 
-            MapEntryNode entityEntry = getMapEntryNodeChild(entityTypeEntry, ENTITY_QNAME, ENTITY_ID_QNAME, entityId);
+            MapEntryNode entityEntry = getMapEntryNodeChild(entityTypeEntry, ENTITY_QNAME, ENTITY_ID_QNAME,
+                    entityId, true);
 
 
-            getMapEntryNodeChild(entityEntry, Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName);
+            getMapEntryNodeChild(entityEntry, Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName, expectPresent);
         } catch(AssertionError e) {
             throw new AssertionError("Verification of entity candidate failed - returned data was: " + node, e);
         }
     }
 
     protected void verifyEntityCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName,
         } catch(AssertionError e) {
             throw new AssertionError("Verification of entity candidate failed - returned data was: " + node, e);
         }
     }
 
     protected void verifyEntityCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName,
-            Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) {
+            Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader, boolean expectPresent) {
         AssertionError lastError = null;
         Stopwatch sw = Stopwatch.createStarted();
         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
             NormalizedNode<?, ?> node = reader.apply(ENTITY_OWNERS_PATH);
             try {
         AssertionError lastError = null;
         Stopwatch sw = Stopwatch.createStarted();
         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
             NormalizedNode<?, ?> node = reader.apply(ENTITY_OWNERS_PATH);
             try {
-                verifyEntityCandidate(node, entityType, entityId, candidateName);
+                verifyEntityCandidate(node, entityType, entityId, candidateName, expectPresent);
                 return;
             } catch (AssertionError e) {
                 lastError = e;
                 return;
             } catch (AssertionError e) {
                 lastError = e;
@@ -87,8 +88,13 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
         throw lastError;
     }
 
         throw lastError;
     }
 
+    protected void verifyEntityCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName,
+            Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) {
+        verifyEntityCandidate(entityType, entityId, candidateName, reader, true);
+    }
+
     protected MapEntryNode getMapEntryNodeChild(DataContainerNode<? extends PathArgument> parent, QName childMap,
     protected MapEntryNode getMapEntryNodeChild(DataContainerNode<? extends PathArgument> parent, QName childMap,
-            QName child, Object key) {
+            QName child, Object key, boolean expectPresent) {
         Optional<DataContainerChild<? extends PathArgument, ?>> childNode =
                 parent.getChild(new NodeIdentifier(childMap));
         assertEquals("Missing " + childMap.toString(), true, childNode.isPresent());
         Optional<DataContainerChild<? extends PathArgument, ?>> childNode =
                 parent.getChild(new NodeIdentifier(childMap));
         assertEquals("Missing " + childMap.toString(), true, childNode.isPresent());
@@ -96,10 +102,13 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
         MapNode entityTypeMapNode = (MapNode) childNode.get();
         Optional<MapEntryNode> entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates(
                 childMap, child, key));
         MapNode entityTypeMapNode = (MapNode) childNode.get();
         Optional<MapEntryNode> entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates(
                 childMap, child, key));
-        if(!entityTypeEntry.isPresent()) {
+        if(expectPresent && !entityTypeEntry.isPresent()) {
             fail("Missing " + childMap.toString() + " entry for " + key + ". Actual: " + entityTypeMapNode.getValue());
             fail("Missing " + childMap.toString() + " entry for " + key + ". Actual: " + entityTypeMapNode.getValue());
+        } else if(!expectPresent && entityTypeEntry.isPresent()) {
+            fail("Found unexpected " + childMap.toString() + " entry for " + key);
         }
         }
-        return entityTypeEntry.get();
+
+        return entityTypeEntry.isPresent() ? entityTypeEntry.get() : null;
     }
 
     static void verifyOwner(String expected, String entityType, YangInstanceIdentifier entityId,
     }
 
     static void verifyOwner(String expected, String entityType, YangInstanceIdentifier entityId,
index aa96fc20d8579a7cc57c11a59b9c440a7b8fcd07..75162422238555569b78465724bd4ce39da0ef8f 100644 (file)
@@ -45,7 +45,7 @@ public class CandidateListChangeListenerTest extends AbstractActorTest {
     public void testOnDataTreeChanged() throws Exception {
         JavaTestKit kit = new JavaTestKit(getSystem());
 
     public void testOnDataTreeChanged() throws Exception {
         JavaTestKit kit = new JavaTestKit(getSystem());
 
-        new CandidateListChangeListener(kit.getRef()).init(shardDataTree);
+        new CandidateListChangeListener(kit.getRef(), "test").init(shardDataTree);
 
         String memberName1 = "member-1";
         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName1));
 
         String memberName1 = "member-1";
         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName1));
index 9bd0ccf2c0e4715e8530038007e18a50adf7a27b..8b80d6b9e76ecceaec30993d7e42496466aae578 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+;
 
 /**
  * Unit tests for EntityOwnerChangeListener.
 
 /**
  * Unit tests for EntityOwnerChangeListener.
@@ -85,9 +86,21 @@ public class EntityOwnerChangeListenerTest {
         reset(mockListenerSupport);
         writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, LOCAL_MEMBER_NAME));
         verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY2, false, true);
         reset(mockListenerSupport);
         writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, LOCAL_MEMBER_NAME));
         verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY2, false, true);
+
+        reset(mockListenerSupport);
+        writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, LOCAL_MEMBER_NAME));
+        verify(mockListenerSupport, never()).notifyEntityOwnershipListeners(any(Entity.class), anyBoolean(), anyBoolean());
+
+        reset(mockListenerSupport);
+        writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, null));
+        verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY2, true, false);
     }
 
     private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
         AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
     }
     }
 
     private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
         AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
     }
+
+    private void deleteNode(YangInstanceIdentifier path) throws DataValidationFailedException {
+        AbstractEntityOwnershipTest.deleteNode(path, shardDataTree);
+    }
 }
 }
index 8ddc0b473ee2c0f8ddd4b1f47bdcc4af9f5685e8..00448751da91155514e424375612d3b11029251b 100644 (file)
@@ -15,9 +15,9 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
+import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.actor.UntypedActorContext;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.util.concurrent.Uninterruptibles;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -25,11 +25,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -43,6 +45,15 @@ import scala.collection.immutable.Iterable;
  */
 public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
  */
 public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+    private ActorContext actorContext;
+
+    @Before
+    public void setup() {
+        TestActorRef<DoNothingActor> actor = actorFactory.createTestActor(
+                Props.create(DoNothingActor.class), actorFactory.generateActorId("test"));
+
+        actorContext = actor.underlyingActor().getContext();
+    }
 
     @After
     public void tearDown() {
 
     @After
     public void tearDown() {
@@ -51,11 +62,7 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
 
     @Test
     public void testNotifyEntityOwnershipListeners() {
 
     @Test
     public void testNotifyEntityOwnershipListeners() {
-        TestActorRef<DoNothingActor> actor = actorFactory.createTestActor(
-                Props.create(DoNothingActor.class), actorFactory.generateActorId("test"));
-
-        UntypedActorContext actorContext = actor.underlyingActor().getContext();
-        EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext);
+        EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext, "test");
 
         EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1");
         EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2");
 
         EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1");
         EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2");
@@ -184,4 +191,22 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
         support.addEntityOwnershipListener(entity1, mockListener2);
         support.removeEntityOwnershipListener(entity1, mockListener2);
     }
         support.addEntityOwnershipListener(entity1, mockListener2);
         support.removeEntityOwnershipListener(entity1, mockListener2);
     }
+
+    @Test
+    public void testHasCandidateForEntity() {
+        EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext, "test");
+        Entity entity = new Entity("type", YangInstanceIdentifier.of(QName.create("test", "id")));
+
+        assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
+
+        support.addEntityOwnershipListener(entity, mock(EntityOwnershipListener.class));
+        assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
+
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+        support.addEntityOwnershipListener(entity, candidate);
+        assertEquals("hasCandidateForEntity", true, support.hasCandidateForEntity(entity));
+
+        support.removeEntityOwnershipListener(entity, candidate);
+        assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
+    }
 }
 }
index 146916a9621481846c05e832c29d440aa1b32b51..2dc754472ee9aa2ce490e618b35aab261d8d1f05 100644 (file)
@@ -93,6 +93,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
     private static final YangInstanceIdentifier ENTITY_ID4 =
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
     private static final YangInstanceIdentifier ENTITY_ID4 =
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
+    private static final YangInstanceIdentifier ENTITY_ID5 =
+            YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
     private static final String LOCAL_MEMBER_NAME = "member-1";
     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
     private static final String LOCAL_MEMBER_NAME = "member-1";
@@ -480,10 +482,6 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
 
         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
-
-        leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
-        kit.expectMsgClass(SuccessReply.class);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
 
         // Add candidates for entity3 with peerMember2 as the owner.
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
 
         // Add candidates for entity3 with peerMember2 as the owner.
@@ -505,6 +503,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
 
         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
 
+        // Add only candidate peerMember1 for entity5.
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
+
         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
         // owner selected
 
         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
         // owner selected
 
@@ -523,12 +527,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
 
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
 
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
 
 
-        // Reinstate peerMember2 - should become owner again for entity 4
+        // Reinstate peerMember2 - no owners should change
 
         peer2 = actorFactory.createTestActor(newShardProps(peerId2,
                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
 
         peer2 = actorFactory.createTestActor(newShardProps(peerId2,
                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
@@ -538,10 +542,22 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
         peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
 
         leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
         peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
 
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
+
+        // Add back candidate peerMember2 for entities 1, 2, & 3.
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
 
         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
 
 
         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
 
@@ -552,7 +568,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         // Verify the reinstated peerMember2 is fully synced.
 
 
         // Verify the reinstated peerMember2 is fully synced.
 
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
@@ -564,14 +580,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
                         peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
         leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
 
                         peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
         leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
 
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
 
         // Verify the reinstated peerMember1 is fully synced.
 
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
 
         // Verify the reinstated peerMember1 is fully synced.
 
-        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
@@ -589,12 +605,67 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         kit.waitUntilLeader(peer2);
 
 
         kit.waitUntilLeader(peer2);
 
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
     }
 
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
     }
 
+    @Test
+    public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
+        ShardIdentifier leaderId = newShardId("leader");
+        ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
+
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
+                TestEntityOwnershipShard.class, localId,
+                ImmutableMap.<String, String>builder().put(leaderId.toString(), "".toString()).build(),
+                dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
+                ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
+                    LOCAL_MEMBER_NAME).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
+        leader.tell(new ElectionTimeout(), leader);
+
+        kit.waitUntilLeader(leader);
+
+        shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
+
+        Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        // Register local candidate
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+        verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+        reset(candidate);
+
+        // Simulate a replicated commit from the leader to remove the local candidate that would occur after a
+        // network partition is healed.
+
+        leader.tell(new PeerDown(LOCAL_MEMBER_NAME, localId.toString()), ActorRef.noSender());
+
+        verify(candidate, timeout(5000)).ownershipChanged(entity, true, false);
+
+        // Since the the shard has a local candidate registered, it should re-add its candidate to the entity.
+
+        verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+        verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+
+        // Unregister the local candidate and verify it's removed and no re-added.
+
+        shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+    }
+
     @Test
     public void testListenerRegistration() throws Exception {
         ShardTestKit kit = new ShardTestKit(getSystem());
     @Test
     public void testListenerRegistration() throws Exception {
         ShardTestKit kit = new ShardTestKit(getSystem());
@@ -681,16 +752,21 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
     private void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
             JavaTestKit sender) {
 
     private void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
             JavaTestKit sender) {
-        BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
-        modifications.setDoCommitOnReady(true);
-        modifications.setReady(true);
-        modifications.setTotalMessagesSent(1);
+        BatchedModifications modifications = newBatchedModifications();
         modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node));
 
         shard.tell(modifications, sender.getRef());
         sender.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
         modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node));
 
         shard.tell(modifications, sender.getRef());
         sender.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
+    private BatchedModifications newBatchedModifications() {
+        BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
+        modifications.setDoCommitOnReady(true);
+        modifications.setReady(true);
+        modifications.setTotalMessagesSent(1);
+        return modifications;
+    }
+
     private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
             YangInstanceIdentifier entityId, String candidateName) {
         verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
     private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
             YangInstanceIdentifier entityId, String candidateName) {
         verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
@@ -720,6 +796,20 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         });
     }
 
         });
     }
 
+    private void verifyNoEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) {
+        verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+            @Override
+            public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+                try {
+                    return AbstractShardTest.readStore(shard, path);
+                } catch(Exception e) {
+                    throw new AssertionError("Failed to read " + path, e);
+                }
+            }
+        }, false);
+    }
+
     private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
             YangInstanceIdentifier entityId, String candidateName) throws Exception {
         assertEquals("BatchedModifications size", 1, mods.size());
     private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
             YangInstanceIdentifier entityId, String candidateName) throws Exception {
         assertEquals("BatchedModifications size", 1, mods.size());
@@ -730,7 +820,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
             YangInstanceIdentifier entityId, String candidateName) throws Exception {
         assertEquals("Modification type", MergeModification.class, mod.getClass());
         verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
             YangInstanceIdentifier entityId, String candidateName) throws Exception {
         assertEquals("Modification type", MergeModification.class, mod.getClass());
         verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
-                entityId, candidateName);
+                entityId, candidateName, true);
     }
 
     private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
     }
 
     private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,