Bug 6540: EOS - Rework behavior of onPeerDown
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
index 46743482928f92bc1e44f28c0df8c27923d8f721..4a6a37aabe77e43e7b77ba6ba7e7a117caf75328 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
@@ -17,10 +18,8 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateMapEntry;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -43,6 +42,7 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Ca
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RemoveAllCandidates;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
@@ -59,6 +59,7 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -81,6 +82,7 @@ class EntityOwnershipShard extends Shard {
     private final EntityOwnerSelectionStrategyConfig strategyConfig;
     private final Map<YangInstanceIdentifier, Cancellable> entityToScheduledOwnershipTask = new HashMap<>();
     private final EntityOwnershipStatistics entityOwnershipStatistics;
+    private boolean removeAllInitialCandidates = true;
 
     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
@@ -134,19 +136,25 @@ class EntityOwnershipShard extends Shard {
             onUnregisterListenerLocal((UnregisterListenerLocal) message);
         } else if(message instanceof SelectOwner) {
             onSelectOwner((SelectOwner) message);
+        } else if(message instanceof RemoveAllCandidates) {
+            onRemoveAllCandidates((RemoveAllCandidates) message);
         } else if(!commitCoordinator.handleMessage(message, this)) {
             super.handleNonRaftCommand(message);
         }
     }
 
+    private void onRemoveAllCandidates(RemoveAllCandidates message) {
+        LOG.debug("{}: onRemoveAllCandidates: {}", persistenceId(), message);
+
+        removeCandidateFromEntities(message.getMemberName());
+    }
+
     private void onSelectOwner(SelectOwner selectOwner) {
         LOG.debug("{}: onSelectOwner: {}", persistenceId(), selectOwner);
 
         String currentOwner = getCurrentOwner(selectOwner.getEntityPath());
         if(Strings.isNullOrEmpty(currentOwner)) {
-            String entityType = EntityOwnersModel.entityTypeFromEntityPath(selectOwner.getEntityPath());
             writeNewOwner(selectOwner.getEntityPath(), newOwner(currentOwner, selectOwner.getAllCandidates(),
-                    entityOwnershipStatistics.byEntityType(entityType),
                     selectOwner.getOwnerSelectionStrategy()));
 
             Cancellable cancellable = entityToScheduledOwnershipTask.get(selectOwner.getEntityPath());
@@ -235,6 +243,8 @@ class EntityOwnershipShard extends Shard {
         } else {
             final ActorSelection leader = getLeader();
             if (leader != null) {
+                possiblyRemoveAllInitialCandidates(leader);
+
                 if(LOG.isDebugEnabled()) {
                     LOG.debug("{}: Sending BatchedModifications {} to leader {}", persistenceId(),
                             modifications.getTransactionID(), leader);
@@ -248,8 +258,26 @@ class EntityOwnershipShard extends Shard {
         }
     }
 
+    void possiblyRemoveAllInitialCandidates(ActorSelection leader) {
+        // The following handles removing all candidates on startup when re-joining with a remote leader. When a
+        // follower is detected as down, the leader will re-assign new owners to entities that were owned by the
+        // down member but doesn't remove the down member as a candidate, as the down node may actually be isolated
+        // and still running. Therefore on startup we send an initial message to the remote leader to remove any
+        // potential stale candidates we had previously registered, as it's possible a candidate may not be
+        // registered by a client in the new incarnation. We have to send the RemoveAllCandidates message prior to any
+        // pending registrations.
+        if(removeAllInitialCandidates && leader != null) {
+            removeAllInitialCandidates = false;
+            if(!isLeader()) {
+                LOG.debug("{} - got new leader {} on startup - sending RemoveAllCandidates", persistenceId(), leader);
+
+                leader.tell(new RemoveAllCandidates(localMemberName), ActorRef.noSender());
+            }
+        }
+    }
+
     boolean hasLeader() {
-        return getLeader() != null && !isIsolatedLeader();
+        return getLeader() != null && (!isLeader() || isLeaderActive());
     }
 
     /**
@@ -326,11 +354,11 @@ class EntityOwnershipShard extends Shard {
             // EntityOwnershipStatistics
             strategyConfig.clearStrategies();
 
-            // Remove the candidates for all members that are known to be down. In a cluster which has greater than
+            // Re-assign owners for all members that are known to be down. In a cluster which has greater than
             // 3 nodes it is possible for a some node beside the leader being down when the leadership transitions
-            // it makes sense to use this event to remove all the candidates for those downed nodes
+            // it makes sense to use this event to re-assign owners for those downed nodes
             for (MemberName downPeerName : downPeerMemberNames) {
-                removeCandidateFromEntities(downPeerName);
+                selectNewOwnerForEntitiesOwnedBy(downPeerName);
             }
         } else {
             // The leader changed - notify the coordinator to check if pending modifications need to be sent.
@@ -345,26 +373,9 @@ class EntityOwnershipShard extends Shard {
 
         if(isLeader()) {
             String currentOwner = getCurrentOwner(message.getEntityPath());
-            if(message.getRemovedCandidate().equals(currentOwner) || message.getRemainingCandidates().size() == 0){
-                String entityType = EntityOwnersModel.entityTypeFromEntityPath(message.getEntityPath());
+            if(message.getRemovedCandidate().equals(currentOwner) || message.getRemainingCandidates().isEmpty()){
                 writeNewOwner(message.getEntityPath(),
-                        newOwner(currentOwner, message.getRemainingCandidates(), entityOwnershipStatistics.byEntityType(entityType),
-                                getEntityOwnerElectionStrategy(message.getEntityPath())));
-            }
-        } else {
-            // We're not the leader. If the removed candidate is our local member then check if we actually
-            // have a local candidate registered. If we do then we must have been partitioned from the leader
-            // and the leader removed our candidate since the leader can't tell the difference between a
-            // temporary network partition and a node's process actually restarted. So, in that case, re-add
-            // our candidate.
-            if(localMemberName.getName().equals(message.getRemovedCandidate()) &&
-                    listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
-                LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
-                    " - adding back local candidate", message.getEntityPath());
-
-                commitCoordinator.commitModification(new MergeModification(
-                        candidatePath(message.getEntityPath(), localMemberName.getName()),
-                        candidateMapEntry(localMemberName.getName())), this);
+                        newOwner(currentOwner, message.getRemainingCandidates(), getEntityOwnerElectionStrategy(message.getEntityPath())));
             }
         }
     }
@@ -383,31 +394,28 @@ class EntityOwnershipShard extends Shard {
 
         // Since a node's candidate member is only added by the node itself, we can assume the node is up so
         // remove it from the downPeerMemberNames.
-        downPeerMemberNames.remove(message.getNewCandidate());
+        downPeerMemberNames.remove(MemberName.forName(message.getNewCandidate()));
 
         final String currentOwner = getCurrentOwner(message.getEntityPath());
         final EntityOwnerSelectionStrategy strategy = getEntityOwnerElectionStrategy(message.getEntityPath());
-        final String entityType = EntityOwnersModel.entityTypeFromEntityPath(message.getEntityPath());
 
         // Available members is all the known peers - the number of peers that are down + self
         // So if there are 2 peers and 1 is down then availableMembers will be 2
-        final int availableMembers = (peerIdToMemberNames.size() - downPeerMemberNames.size()) + 1;
+        final int availableMembers = peerIdToMemberNames.size() - downPeerMemberNames.size() + 1;
 
         LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner);
 
-        if(!message.getAllCandidates().contains(currentOwner)){
-            if(strategy.getSelectionDelayInMillis() == 0L) {
-                writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
-                        entityOwnershipStatistics.byEntityType(entityType), strategy));
-            } else if(message.getAllCandidates().size() == availableMembers) {
-                LOG.debug("{}: Received the maximum candidates requests : {} writing new owner",
-                        persistenceId(), availableMembers);
-                cancelOwnerSelectionTask(message.getEntityPath());
-                writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
-                        entityOwnershipStatistics.byEntityType(entityType), strategy));
-            } else {
-                scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);
-            }
+        if(strategy.getSelectionDelayInMillis() == 0L) {
+            writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
+                    strategy));
+        } else if(message.getAllCandidates().size() == availableMembers) {
+            LOG.debug("{}: Received the maximum candidates requests : {} writing new owner",
+                    persistenceId(), availableMembers);
+            cancelOwnerSelectionTask(message.getEntityPath());
+            writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
+                    strategy));
+        } else {
+            scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);
         }
     }
 
@@ -416,11 +424,41 @@ class EntityOwnershipShard extends Shard {
 
         MemberName downMemberName = peerDown.getMemberName();
         if(downPeerMemberNames.add(downMemberName) && isLeader()) {
-            // Remove the down peer as a candidate from all entities.
-            removeCandidateFromEntities(downMemberName);
+            // Select new owners for entities owned by the down peer and which have other candidates. For an entity for
+            // which the down peer is the only candidate, we leave it as the owner and don't clear it. This is done to
+            // handle the case where the peer member process is actually still running but the node is partitioned.
+            // When the partition is healed, the peer just remains as the owner. If the peer process actually restarted,
+            // it will first remove all its candidates on startup. If another candidate is registered during the time
+            // the peer is down, the new candidate will be selected as the new owner.
+
+            selectNewOwnerForEntitiesOwnedBy(downMemberName);
         }
     }
 
+    private void selectNewOwnerForEntitiesOwnedBy(MemberName downMemberName) {
+        final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntitiesOwnedBy(downMemberName.getName(), (entityTypeNode, entityNode) -> {
+            YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+                    node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
+                    node(ENTITY_OWNER_NODE_ID).build();
+            String newOwner = newOwner(getCurrentOwner(entityPath), getCandidateNames(entityNode),
+                    getEntityOwnerElectionStrategy(entityPath));
+
+            if(!newOwner.isEmpty()) {
+                LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+                modifications.addModification(new WriteModification(entityPath,
+                        ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+
+            } else {
+                LOG.debug("{}: Found entity {} but no other candidates - not clearing owner", persistenceId(),
+                        entityPath, newOwner);
+            }
+        });
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
     private void onPeerUp(PeerUp peerUp) {
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
@@ -428,23 +466,45 @@ class EntityOwnershipShard extends Shard {
         downPeerMemberNames.remove(peerUp.getMemberName());
 
         // Notify the coordinator to check if pending modifications need to be sent. We do this here
-        // to handle the case where the leader's peer address isn't now yet when a prior state or
+        // to handle the case where the leader's peer address isn't known yet when a prior state or
         // leader change occurred.
         commitCoordinator.onStateChanged(this, isLeader());
     }
 
-    private void removeCandidateFromEntities(final MemberName owner) {
+    private Collection<String> getCandidateNames(MapEntryNode entity) {
+        Collection<MapEntryNode> candidates = ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getValue();
+        Collection<String> candidateNames = new ArrayList<>(candidates.size());
+        for(MapEntryNode candidate: candidates) {
+            candidateNames.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
+        }
+
+        return candidateNames;
+    }
+
+    private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
+        LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
+
+        searchForEntities((entityTypeNode, entityNode) -> {
+            Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+                    entityNode.getChild(ENTITY_OWNER_NODE_ID);
+            if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+                walker.onEntity(entityTypeNode, entityNode);
+            }
+        });
+    }
+
+    private void removeCandidateFromEntities(final MemberName member) {
         final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
         searchForEntities((entityTypeNode, entityNode) -> {
-            if (hasCandidate(entityNode, owner)) {
+            if (hasCandidate(entityNode, member)) {
                 YangInstanceIdentifier entityId =
                         (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
                 YangInstanceIdentifier candidatePath = candidatePath(
                         entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
-                        entityId, owner.getName());
+                        entityId, member.getName());
 
                 LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
-                        owner, candidatePath);
+                        member, candidatePath);
 
                 modifications.addModification(new DeleteModification(candidatePath));
             }
@@ -511,9 +571,9 @@ class EntityOwnershipShard extends Shard {
         }
     }
 
-    private String newOwner(String currentOwner, Collection<String> candidates, Map<String, Long> statistics, EntityOwnerSelectionStrategy ownerSelectionStrategy) {
+    private String newOwner(String currentOwner, Collection<String> candidates, EntityOwnerSelectionStrategy ownerSelectionStrategy) {
         Collection<String> viableCandidates = getViableCandidates(candidates);
-        if(viableCandidates.size() == 0){
+        if(viableCandidates.isEmpty()){
             return "";
         }
         return ownerSelectionStrategy.newOwner(currentOwner, viableCandidates);
@@ -523,7 +583,7 @@ class EntityOwnershipShard extends Shard {
         Collection<String> viableCandidates = new ArrayList<>();
 
         for (String candidate : candidates) {
-            if (!downPeerMemberNames.contains(candidate)) {
+            if (!downPeerMemberNames.contains(MemberName.forName(candidate))) {
                 viableCandidates.add(candidate);
             }
         }
@@ -538,6 +598,7 @@ class EntityOwnershipShard extends Shard {
         return null;
     }
 
+    @FunctionalInterface
     private static interface EntityWalker {
         void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
     }