X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShard.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShard.java;h=db71da63e438a2e1fdb46837ae62654c1906d2b4;hp=99ef94ef5039ba9e39a14afe7bf59f73af59b608;hb=0fab6c716548e89938c1a8493dc25991c006aa10;hpb=7ea5e81beb0b5d265713e01a14cfa2562ea28c6c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 99ef94ef50..db71da63e4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -24,10 +24,15 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import akka.cluster.Cluster; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.ClusterEvent.CurrentClusterState; import akka.pattern.Patterns; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -48,7 +53,6 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Un import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.PeerDown; import org.opendaylight.controller.cluster.datastore.messages.PeerUp; @@ -78,7 +82,6 @@ class EntityOwnershipShard extends Shard { private final EntityOwnershipShardCommitCoordinator commitCoordinator; private final EntityOwnershipListenerSupport listenerSupport; private final Set downPeerMemberNames = new HashSet<>(); - private final Map peerIdToMemberNames = new HashMap<>(); private final EntityOwnerSelectionStrategyConfig strategyConfig; private final Map entityToScheduledOwnershipTask = new HashMap<>(); private final EntityOwnershipStatistics entityOwnershipStatistics; @@ -96,11 +99,6 @@ class EntityOwnershipShard extends Shard { this.strategyConfig = builder.ownerSelectionStrategyConfig; this.entityOwnershipStatistics = new EntityOwnershipStatistics(); this.entityOwnershipStatistics.init(getDataStore()); - - for(String peerId: getRaftActorContext().getPeerIds()) { - ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId); - peerIdToMemberNames.put(peerId, shardId.getMemberName()); - } } @Override @@ -347,17 +345,26 @@ class EntityOwnershipShard extends Shard { if (isLeader) { + // Re-initialize the downPeerMemberNames from the current akka Cluster state. The previous leader, if any, + // is most likely down however it's possible we haven't received the PeerDown message yet. + initializeDownPeerMemberNamesFromClusterState(); + // Clear all existing strategies so that they get re-created when we call createStrategy again // This allows the strategies to be re-initialized with existing statistics maintained by // EntityOwnershipStatistics strategyConfig.clearStrategies(); // Re-assign owners for all members that are known to be down. In a cluster which has greater than - // 3 nodes it is possible for a some node beside the leader being down when the leadership transitions - // it makes sense to use this event to re-assign owners for those downed nodes + // 3 nodes it is possible for some node beside the leader being down when the leadership transitions + // it makes sense to use this event to re-assign owners for those downed nodes. + Set ownedBy = new HashSet<>(downPeerMemberNames.size() + 1); for (MemberName downPeerName : downPeerMemberNames) { - selectNewOwnerForEntitiesOwnedBy(downPeerName); + ownedBy.add(downPeerName.getName()); } + + // Also try to assign owners for entities that have no current owner. See explanation in onPeerUp. + ownedBy.add(""); + selectNewOwnerForEntitiesOwnedBy(ownedBy); } else { // The leader changed - notify the coordinator to check if pending modifications need to be sent. // While onStateChanged also does this, this method handles the case where the shard hears from a @@ -368,6 +375,33 @@ class EntityOwnershipShard extends Shard { super.onLeaderChanged(oldLeader, newLeader); } + private void initializeDownPeerMemberNamesFromClusterState() { + java.util.Optional cluster = getRaftActorContext().getCluster(); + if(!cluster.isPresent()) { + return; + } + + CurrentClusterState state = cluster.get().state(); + Set unreachable = state.getUnreachable(); + + LOG.debug("{}: initializeDownPeerMemberNamesFromClusterState - current downPeerMemberNames: {}, unreachable: {}", + persistenceId(), downPeerMemberNames, unreachable); + + downPeerMemberNames.clear(); + for(Member m: unreachable) { + downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next())); + } + + for(Member m: state.getMembers()) { + if(m.status() != MemberStatus.up() && m.status() != MemberStatus.weaklyUp()) { + LOG.debug("{}: Adding down member with status {}", persistenceId(), m.status()); + downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next())); + } + } + + LOG.debug("{}: new downPeerMemberNames: {}", persistenceId(), downPeerMemberNames); + } + private void onCandidateRemoved(CandidateRemoved message) { LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message); @@ -399,7 +433,7 @@ class EntityOwnershipShard extends Shard { // Available members is all the known peers - the number of peers that are down + self // So if there are 2 peers and 1 is down then availableMembers will be 2 - final int availableMembers = peerIdToMemberNames.size() - downPeerMemberNames.size() + 1; + final int availableMembers = getRaftActorContext().getPeerIds().size() - downPeerMemberNames.size() + 1; LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner); @@ -429,13 +463,13 @@ class EntityOwnershipShard extends Shard { // it will first remove all its candidates on startup. If another candidate is registered during the time // the peer is down, the new candidate will be selected as the new owner. - selectNewOwnerForEntitiesOwnedBy(downMemberName); + selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(downMemberName.getName())); } } - private void selectNewOwnerForEntitiesOwnedBy(MemberName downMemberName) { + private void selectNewOwnerForEntitiesOwnedBy(Set ownedBy) { final BatchedModifications modifications = commitCoordinator.newBatchedModifications(); - searchForEntitiesOwnedBy(downMemberName.getName(), (entityTypeNode, entityNode) -> { + searchForEntitiesOwnedBy(ownedBy, (entityTypeNode, entityNode) -> { YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH). node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()). node(ENTITY_OWNER_NODE_ID).build(); @@ -460,13 +494,23 @@ class EntityOwnershipShard extends Shard { private void onPeerUp(PeerUp peerUp) { LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp); - peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName()); downPeerMemberNames.remove(peerUp.getMemberName()); // Notify the coordinator to check if pending modifications need to be sent. We do this here // to handle the case where the leader's peer address isn't known yet when a prior state or // leader change occurred. commitCoordinator.onStateChanged(this, isLeader()); + + if(isLeader()) { + // Try to assign owners for entities that have no current owner. It's possible the peer that is now up + // had previously registered as a candidate and was the only candidate but the owner write tx couldn't be + // committed due to a leader change. Eg, the leader is able to successfully commit the candidate add tx but + // becomes isolated before it can commit the owner change and switches to follower. The majority partition + // with a new leader has the candidate but the entity has no owner. When the partition is healed and the + // previously isolated leader reconnects, we'll receive onPeerUp and, if there's still no owner, the + // previous leader will gain ownership. + selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of("")); + } } private Collection getCandidateNames(MapEntryNode entity) { @@ -479,13 +523,14 @@ class EntityOwnershipShard extends Shard { return candidateNames; } - private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) { - LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner); + private void searchForEntitiesOwnedBy(Set ownedBy, EntityWalker walker) { + LOG.debug("{}: Searching for entities owned by {}", persistenceId(), ownedBy); searchForEntities((entityTypeNode, entityNode) -> { Optional> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID); - if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) { + String currentOwner = possibleOwner.isPresent() ? possibleOwner.get().getValue().toString() : ""; + if(ownedBy.contains(currentOwner)) { walker.onEntity(entityTypeNode, entityNode); } });