Bug 4105: Remove candidates on PeerDown
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
index 4dfbc87eb9a2b1b7bcb3545e1e67dc2c03304c3a..58a7a036759dc6fa9de884a22449563f53c14888 100644 (file)
@@ -10,13 +10,18 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateMapEntry;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -80,7 +85,7 @@ class EntityOwnershipShard extends Shard {
         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
         this.localMemberName = localMemberName;
         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
-        this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
+        this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
 
         for(String peerId: peerAddresses.keySet()) {
             ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
@@ -97,7 +102,7 @@ class EntityOwnershipShard extends Shard {
     protected void onRecoveryComplete() {
         super.onRecoveryComplete();
 
-        new CandidateListChangeListener(getSelf()).init(getDataStore());
+        new CandidateListChangeListener(getSelf(), persistenceId()).init(getDataStore());
         new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
     }
 
@@ -236,15 +241,28 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void onCandidateRemoved(CandidateRemoved message) {
-        if(!isLeader()) {
-            return;
-        }
-
         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
 
-        String currentOwner = getCurrentOwner(message.getEntityPath());
-        if(message.getRemovedCandidate().equals(currentOwner)){
-            writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+        if(isLeader()) {
+            String currentOwner = getCurrentOwner(message.getEntityPath());
+            if(message.getRemovedCandidate().equals(currentOwner)){
+                writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+            }
+        } else {
+            // We're not the leader. If the removed candidate is our local member then check if we actually
+            // have a local candidate registered. If we do then we must have been partitioned from the leader
+            // and the leader removed our candidate since the leader can't tell the difference between a
+            // temporary network partition and a node's process actually restarted. So, in that case, re-add
+            // our candidate.
+            if(localMemberName.equals(message.getRemovedCandidate()) &&
+                    listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
+                LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
+                    " - adding back local candidate", message.getEntityPath());
+
+                commitCoordinator.commitModification(new MergeModification(
+                        candidatePath(message.getEntityPath(), localMemberName),
+                        candidateMapEntry(localMemberName)), this);
+            }
         }
     }
 
@@ -255,6 +273,10 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
 
+        // Since a node's candidate member is only added by the node itself, we can assume the node is up so
+        // remove it from the downPeerMemberNames.
+        downPeerMemberNames.remove(message.getNewCandidate());
+
         String currentOwner = getCurrentOwner(message.getEntityPath());
         if(Strings.isNullOrEmpty(currentOwner)){
             writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
@@ -262,12 +284,12 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void onPeerDown(PeerDown peerDown) {
-        LOG.debug("{}: onPeerDown: {}", persistenceId(), peerDown);
+        LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
 
         String downMemberName = peerDown.getMemberName();
         if(downPeerMemberNames.add(downMemberName) && isLeader()) {
-            // Select new owners for entities owned by the down peer.
-            selectNewOwnerForEntitiesOwnedBy(downMemberName);
+            // Remove the down peer as a candidate from all entities.
+            removeCandidateFromEntities(downMemberName);
         }
     }
 
@@ -275,13 +297,7 @@ class EntityOwnershipShard extends Shard {
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
         peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
-
-        if(downPeerMemberNames.remove(peerUp.getMemberName()) && isLeader()) {
-            // This peer was previously down - for its previously owned entities, if there were no other
-            // candidates, the owner would have been cleared so handle that here by trying to re-assign
-            // ownership for entities whose owner is cleared.
-            selectNewOwnerForEntitiesOwnedBy("");
-        }
+        downPeerMemberNames.remove(peerUp.getMemberName());
     }
 
     private void selectNewOwnerForEntitiesOwnedBy(String owner) {
@@ -304,7 +320,34 @@ class EntityOwnershipShard extends Shard {
         commitCoordinator.commitModifications(modifications, this);
     }
 
-    private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) {
+    private void removeCandidateFromEntities(final String owner) {
+        final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntities(new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                if(hasCandidate(entityNode, owner)) {
+                    YangInstanceIdentifier entityId =
+                            (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+                    YangInstanceIdentifier candidatePath = candidatePath(
+                            entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
+                            entityId, owner);
+
+                    LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
+                            owner, candidatePath);
+
+                    modifications.addModification(new DeleteModification(candidatePath));
+                }
+            }
+        });
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
+    private boolean hasCandidate(MapEntryNode entity, String candidateName) {
+        return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName)).isPresent();
+    }
+
+    private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
         Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
         if(!possibleEntityTypes.isPresent()) {
@@ -313,6 +356,25 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
 
+        searchForEntities(new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+                        entityNode.getChild(ENTITY_OWNER_NODE_ID);
+                if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+                    walker.onEntity(entityTypeNode, entityNode);
+                }
+            }
+        });
+    }
+
+    private void searchForEntities(EntityWalker walker) {
+        DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+        Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
+        if(!possibleEntityTypes.isPresent()) {
+            return;
+        }
+
         for(MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
                     entityType.getChild(ENTITY_NODE_ID);
@@ -321,11 +383,7 @@ class EntityOwnershipShard extends Shard {
             }
 
             for(MapEntryNode entity:  ((MapNode) possibleEntities.get()).getValue()) {
-                Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
-                        entity.getChild(ENTITY_OWNER_NODE_ID);
-                if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
-                    walker.onEntity(entityType, entity);
-                }
+                walker.onEntity(entityType, entity);
             }
         }
     }