Bug 4105: Remove EntityOwnershipCandidate
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
index a71c86dd9f3f165b8e254e3918a0f9dfacdda811..1641b668c325804e1680cd3089f95a83e91c2d7b 100644 (file)
@@ -9,12 +9,19 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateMapEntry;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -34,7 +41,9 @@ import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
@@ -76,7 +85,7 @@ class EntityOwnershipShard extends Shard {
         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
         this.localMemberName = localMemberName;
         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
-        this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
+        this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
 
         for(String peerId: peerAddresses.keySet()) {
             ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
@@ -93,7 +102,7 @@ class EntityOwnershipShard extends Shard {
     protected void onRecoveryComplete() {
         super.onRecoveryComplete();
 
-        new CandidateListChangeListener(getSelf()).init(getDataStore());
+        new CandidateListChangeListener(getSelf(), persistenceId()).init(getDataStore());
         new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
     }
 
@@ -111,6 +120,10 @@ class EntityOwnershipShard extends Shard {
             onPeerDown((PeerDown) message);
         } else if(message instanceof PeerUp) {
             onPeerUp((PeerUp) message);
+        } if(message instanceof RegisterListenerLocal) {
+            onRegisterListenerLocal((RegisterListenerLocal)message);
+        } if(message instanceof UnregisterListenerLocal) {
+            onUnregisterListenerLocal((UnregisterListenerLocal)message);
         } else if(!commitCoordinator.handleMessage(message, this)) {
             super.onReceiveCommand(message);
         }
@@ -119,7 +132,7 @@ class EntityOwnershipShard extends Shard {
     private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
         LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
 
-        listenerSupport.addEntityOwnershipListener(registerCandidate.getEntity(), registerCandidate.getCandidate());
+        listenerSupport.setHasCandidateForEntity(registerCandidate.getEntity());
 
         NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
                 registerCandidate.getEntity().getId(), localMemberName);
@@ -132,7 +145,7 @@ class EntityOwnershipShard extends Shard {
         LOG.debug("{}: onUnregisterCandidateLocal: {}", persistenceId(), unregisterCandidate);
 
         Entity entity = unregisterCandidate.getEntity();
-        listenerSupport.removeEntityOwnershipListener(entity, unregisterCandidate.getCandidate());
+        listenerSupport.unsetHasCandidateForEntity(entity);
 
         YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
         commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
@@ -140,6 +153,36 @@ class EntityOwnershipShard extends Shard {
         getSender().tell(SuccessReply.INSTANCE, getSelf());
     }
 
+    private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
+        LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
+
+        listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
+
+        getSender().tell(SuccessReply.INSTANCE, getSelf());
+
+        searchForEntitiesOwnedBy(localMemberName, new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
+                        entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
+                String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
+                if(registerListener.getEntityType().equals(entityType)) {
+                    Entity entity = new Entity(entityType,
+                            (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
+                    listenerSupport.notifyEntityOwnershipListener(entity, false, true, true, registerListener.getListener());
+                }
+            }
+        });
+    }
+
+    private void onUnregisterListenerLocal(UnregisterListenerLocal unregisterListener) {
+        LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
+
+        listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(), unregisterListener.getListener());
+
+        getSender().tell(SuccessReply.INSTANCE, getSelf());
+    }
+
     void tryCommitModifications(final BatchedModifications modifications) {
         if(isLeader()) {
             LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID());
@@ -198,15 +241,28 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void onCandidateRemoved(CandidateRemoved message) {
-        if(!isLeader()) {
-            return;
-        }
-
         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
 
-        String currentOwner = getCurrentOwner(message.getEntityPath());
-        if(message.getRemovedCandidate().equals(currentOwner)){
-            writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+        if(isLeader()) {
+            String currentOwner = getCurrentOwner(message.getEntityPath());
+            if(message.getRemovedCandidate().equals(currentOwner)){
+                writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+            }
+        } else {
+            // We're not the leader. If the removed candidate is our local member then check if we actually
+            // have a local candidate registered. If we do then we must have been partitioned from the leader
+            // and the leader removed our candidate since the leader can't tell the difference between a
+            // temporary network partition and a node's process actually restarted. So, in that case, re-add
+            // our candidate.
+            if(localMemberName.equals(message.getRemovedCandidate()) &&
+                    listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
+                LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
+                    " - adding back local candidate", message.getEntityPath());
+
+                commitCoordinator.commitModification(new MergeModification(
+                        candidatePath(message.getEntityPath(), localMemberName),
+                        candidateMapEntry(localMemberName)), this);
+            }
         }
     }
 
@@ -217,6 +273,10 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
 
+        // Since a node's candidate member is only added by the node itself, we can assume the node is up so
+        // remove it from the downPeerMemberNames.
+        downPeerMemberNames.remove(message.getNewCandidate());
+
         String currentOwner = getCurrentOwner(message.getEntityPath());
         if(Strings.isNullOrEmpty(currentOwner)){
             writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
@@ -224,12 +284,12 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void onPeerDown(PeerDown peerDown) {
-        LOG.debug("{}: onPeerDown: {}", persistenceId(), peerDown);
+        LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
 
         String downMemberName = peerDown.getMemberName();
         if(downPeerMemberNames.add(downMemberName) && isLeader()) {
-            // Select new owners for entities owned by the down peer.
-            selectNewOwnerForEntitiesOwnedBy(downMemberName);
+            // Remove the down peer as a candidate from all entities.
+            removeCandidateFromEntities(downMemberName);
         }
     }
 
@@ -237,16 +297,57 @@ class EntityOwnershipShard extends Shard {
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
         peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
-
-        if(downPeerMemberNames.remove(peerUp.getMemberName()) && isLeader()) {
-            // This peer was previously down - for its previously owned entities, if there were no other
-            // candidates, the owner would have been cleared so handle that here by trying to re-assign
-            // ownership for entities whose owner is cleared.
-            selectNewOwnerForEntitiesOwnedBy("");
-        }
+        downPeerMemberNames.remove(peerUp.getMemberName());
     }
 
     private void selectNewOwnerForEntitiesOwnedBy(String owner) {
+        final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntitiesOwnedBy(owner, new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Object newOwner = newOwner(getCandidateNames(entityNode));
+                YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+                        node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
+                        node(ENTITY_OWNER_NODE_ID).build();
+
+                LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+                modifications.addModification(new WriteModification(entityPath,
+                        ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+            }
+        });
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
+    private void removeCandidateFromEntities(final String owner) {
+        final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntities(new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                if(hasCandidate(entityNode, owner)) {
+                    YangInstanceIdentifier entityId =
+                            (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+                    YangInstanceIdentifier candidatePath = candidatePath(
+                            entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
+                            entityId, owner);
+
+                    LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
+                            owner, candidatePath);
+
+                    modifications.addModification(new DeleteModification(candidatePath));
+                }
+            }
+        });
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
+    private boolean hasCandidate(MapEntryNode entity, String candidateName) {
+        return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName)).isPresent();
+    }
+
+    private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
         Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
         if(!possibleEntityTypes.isPresent()) {
@@ -255,7 +356,25 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
 
-        BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntities(new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+                        entityNode.getChild(ENTITY_OWNER_NODE_ID);
+                if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+                    walker.onEntity(entityTypeNode, entityNode);
+                }
+            }
+        });
+    }
+
+    private void searchForEntities(EntityWalker walker) {
+        DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+        Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
+        if(!possibleEntityTypes.isPresent()) {
+            return;
+        }
+
         for(MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
                     entityType.getChild(ENTITY_NODE_ID);
@@ -264,23 +383,9 @@ class EntityOwnershipShard extends Shard {
             }
 
             for(MapEntryNode entity:  ((MapNode) possibleEntities.get()).getValue()) {
-                Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
-                        entity.getChild(ENTITY_OWNER_NODE_ID);
-                if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
-                    Object newOwner = newOwner(getCandidateNames(entity));
-                    YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
-                            node(entityType.getIdentifier()).node(ENTITY_NODE_ID).node(entity.getIdentifier()).
-                                    node(ENTITY_OWNER_NODE_ID).build();
-
-                    LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
-
-                    modifications.addModification(new WriteModification(entityPath,
-                            ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
-                }
+                walker.onEntity(entityType, entity);
             }
         }
-
-        commitCoordinator.commitModifications(modifications, this);
     }
 
     private Collection<String> getCandidateNames(MapEntryNode entity) {
@@ -341,4 +446,8 @@ class EntityOwnershipShard extends Shard {
             return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
         }
     }
+
+    private static interface EntityWalker {
+        void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
+    }
 }