Bug 6540: EOS - handle edge case with pruning pending owner change commits
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
index 99ef94ef5039ba9e39a14afe7bf59f73af59b608..db71da63e438a2e1fdb46837ae62654c1906d2b4 100644 (file)
@@ -24,10 +24,15 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.pattern.Patterns;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -48,7 +53,6 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Un
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
@@ -78,7 +82,6 @@ class EntityOwnershipShard extends Shard {
     private final EntityOwnershipShardCommitCoordinator commitCoordinator;
     private final EntityOwnershipListenerSupport listenerSupport;
     private final Set<MemberName> downPeerMemberNames = new HashSet<>();
-    private final Map<String, MemberName> peerIdToMemberNames = new HashMap<>();
     private final EntityOwnerSelectionStrategyConfig strategyConfig;
     private final Map<YangInstanceIdentifier, Cancellable> entityToScheduledOwnershipTask = new HashMap<>();
     private final EntityOwnershipStatistics entityOwnershipStatistics;
@@ -96,11 +99,6 @@ class EntityOwnershipShard extends Shard {
         this.strategyConfig = builder.ownerSelectionStrategyConfig;
         this.entityOwnershipStatistics = new EntityOwnershipStatistics();
         this.entityOwnershipStatistics.init(getDataStore());
-
-        for(String peerId: getRaftActorContext().getPeerIds()) {
-            ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId);
-            peerIdToMemberNames.put(peerId, shardId.getMemberName());
-        }
     }
 
     @Override
@@ -347,17 +345,26 @@ class EntityOwnershipShard extends Shard {
 
         if (isLeader) {
 
+            // Re-initialize the downPeerMemberNames from the current akka Cluster state. The previous leader, if any,
+            // is most likely down however it's possible we haven't received the PeerDown message yet.
+            initializeDownPeerMemberNamesFromClusterState();
+
             // Clear all existing strategies so that they get re-created when we call createStrategy again
             // This allows the strategies to be re-initialized with existing statistics maintained by
             // EntityOwnershipStatistics
             strategyConfig.clearStrategies();
 
             // Re-assign owners for all members that are known to be down. In a cluster which has greater than
-            // 3 nodes it is possible for a some node beside the leader being down when the leadership transitions
-            // it makes sense to use this event to re-assign owners for those downed nodes
+            // 3 nodes it is possible for some node beside the leader being down when the leadership transitions
+            // it makes sense to use this event to re-assign owners for those downed nodes.
+            Set<String> ownedBy = new HashSet<>(downPeerMemberNames.size() + 1);
             for (MemberName downPeerName : downPeerMemberNames) {
-                selectNewOwnerForEntitiesOwnedBy(downPeerName);
+                ownedBy.add(downPeerName.getName());
             }
+
+            // Also try to assign owners for entities that have no current owner. See explanation in onPeerUp.
+            ownedBy.add("");
+            selectNewOwnerForEntitiesOwnedBy(ownedBy);
         } else {
             // The leader changed - notify the coordinator to check if pending modifications need to be sent.
             // While onStateChanged also does this, this method handles the case where the shard hears from a
@@ -368,6 +375,33 @@ class EntityOwnershipShard extends Shard {
         super.onLeaderChanged(oldLeader, newLeader);
     }
 
+    private void initializeDownPeerMemberNamesFromClusterState() {
+        java.util.Optional<Cluster> cluster = getRaftActorContext().getCluster();
+        if(!cluster.isPresent()) {
+            return;
+        }
+
+        CurrentClusterState state = cluster.get().state();
+        Set<Member> unreachable = state.getUnreachable();
+
+        LOG.debug("{}: initializeDownPeerMemberNamesFromClusterState - current downPeerMemberNames: {}, unreachable: {}",
+                persistenceId(), downPeerMemberNames, unreachable);
+
+        downPeerMemberNames.clear();
+        for(Member m: unreachable) {
+            downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
+        }
+
+        for(Member m: state.getMembers()) {
+            if(m.status() != MemberStatus.up() && m.status() != MemberStatus.weaklyUp()) {
+                LOG.debug("{}: Adding down member with status {}", persistenceId(), m.status());
+                downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
+            }
+        }
+
+        LOG.debug("{}: new downPeerMemberNames: {}", persistenceId(), downPeerMemberNames);
+    }
+
     private void onCandidateRemoved(CandidateRemoved message) {
         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
 
@@ -399,7 +433,7 @@ class EntityOwnershipShard extends Shard {
 
         // Available members is all the known peers - the number of peers that are down + self
         // So if there are 2 peers and 1 is down then availableMembers will be 2
-        final int availableMembers = peerIdToMemberNames.size() - downPeerMemberNames.size() + 1;
+        final int availableMembers = getRaftActorContext().getPeerIds().size() - downPeerMemberNames.size() + 1;
 
         LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner);
 
@@ -429,13 +463,13 @@ class EntityOwnershipShard extends Shard {
             // it will first remove all its candidates on startup. If another candidate is registered during the time
             // the peer is down, the new candidate will be selected as the new owner.
 
-            selectNewOwnerForEntitiesOwnedBy(downMemberName);
+            selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(downMemberName.getName()));
         }
     }
 
-    private void selectNewOwnerForEntitiesOwnedBy(MemberName downMemberName) {
+    private void selectNewOwnerForEntitiesOwnedBy(Set<String> ownedBy) {
         final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
-        searchForEntitiesOwnedBy(downMemberName.getName(), (entityTypeNode, entityNode) -> {
+        searchForEntitiesOwnedBy(ownedBy, (entityTypeNode, entityNode) -> {
             YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
                     node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
                     node(ENTITY_OWNER_NODE_ID).build();
@@ -460,13 +494,23 @@ class EntityOwnershipShard extends Shard {
     private void onPeerUp(PeerUp peerUp) {
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
-        peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
         downPeerMemberNames.remove(peerUp.getMemberName());
 
         // Notify the coordinator to check if pending modifications need to be sent. We do this here
         // to handle the case where the leader's peer address isn't known yet when a prior state or
         // leader change occurred.
         commitCoordinator.onStateChanged(this, isLeader());
+
+        if(isLeader()) {
+            // Try to assign owners for entities that have no current owner. It's possible the peer that is now up
+            // had previously registered as a candidate and was the only candidate but the owner write tx couldn't be
+            // committed due to a leader change. Eg, the leader is able to successfully commit the candidate add tx but
+            // becomes isolated before it can commit the owner change and switches to follower. The majority partition
+            // with a new leader has the candidate but the entity has no owner. When the partition is healed and the
+            // previously isolated leader reconnects, we'll receive onPeerUp and, if there's still no owner, the
+            // previous leader will gain ownership.
+            selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(""));
+        }
     }
 
     private Collection<String> getCandidateNames(MapEntryNode entity) {
@@ -479,13 +523,14 @@ class EntityOwnershipShard extends Shard {
         return candidateNames;
     }
 
-    private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
-        LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
+    private void searchForEntitiesOwnedBy(Set<String> ownedBy, EntityWalker walker) {
+        LOG.debug("{}: Searching for entities owned by {}", persistenceId(), ownedBy);
 
         searchForEntities((entityTypeNode, entityNode) -> {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
                     entityNode.getChild(ENTITY_OWNER_NODE_ID);
-            if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+            String currentOwner = possibleOwner.isPresent() ? possibleOwner.get().getValue().toString() : "";
+            if(ownedBy.contains(currentOwner)) {
                 walker.onEntity(entityTypeNode, entityNode);
             }
         });