From: Tom Pantelis Date: Tue, 18 Aug 2015 06:59:47 +0000 (-0400) Subject: Bug 4105: Remove candidates on PeerDown X-Git-Tag: release/beryllium~296 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a4d9810d7211097f2803174d0c23a27b53dbc9d2 Bug 4105: Remove candidates on PeerDown Currently on PeerDown, the EntityOwnershipShard selects a new owner for the entities owned by the down node and leaves the down node as a candidate. If the down node is the only candidate, the owner is cleared. On PeerUp, it selects a new owner for those entities whose owner is clear. This was done to handle network partition so a node's candidates remain registered and are re-assigned when the partition is healed. Howver this has potential issues when a node is actually stopped/restarted. It's possible, on restart, that the node doesn't register a candidate for an entity that it had previously registered for. So it may get ownership of an entity for which it has no registered candidate. To alleviate this, I changed it to remove all the down node's candidates on PeerDown. If the node was stopped/restarted, then it will re-register candidates based on local client requests. This case will be the norm. To handle network partition, when healed, the follower node will get the replicated commits for its candidate removals from the leader. So on Candidate removed, it re-adds its removed candidate if it has a registered EntityOwnershipCandidate. I realized that one can register a DOMDataTreeChangeListener for a leaf node. So I simplified EntityOwnerChangeListener to listen for the owner leaf instead of the entity path. This avoids the extra notifications when candidayes are added/removed. I actually did this originally b/c I thought there was a bug when listening at the entity level which turned out there wasn't but I left it in as an improvement. I also added the shard's logId to the listener and support classes for better debugging of unit tests. Change-Id: I75d2567ce54b9129eee052ba521c8a71777289b6 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java index 82f926aea9..bd41ebe942 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java @@ -47,11 +47,13 @@ import org.slf4j.LoggerFactory; class CandidateListChangeListener implements DOMDataTreeChangeListener { private static final Logger LOG = LoggerFactory.getLogger(CandidateListChangeListener.class); + private final String logId; private final ActorRef shard; private final Map> currentCandidates = new HashMap<>(); - CandidateListChangeListener(ActorRef shard) { + CandidateListChangeListener(ActorRef shard, String logId) { this.shard = Preconditions.checkNotNull(shard, "shard should not be null"); + this.logId = logId; } void init(ShardDataTree shardDataTree) { @@ -65,7 +67,7 @@ class CandidateListChangeListener implements DOMDataTreeChangeListener { for(DataTreeCandidate change: changes) { DataTreeCandidateNode changeRoot = change.getRootNode(); - LOG.debug("Candidate node changed: {}, {}", changeRoot.getModificationType(), change.getRootPath()); + LOG.debug("{}: Candidate node changed: {}, {}", logId, changeRoot.getModificationType(), change.getRootPath()); NodeIdentifierWithPredicates candidateKey = (NodeIdentifierWithPredicates) change.getRootPath().getLastPathArgument(); @@ -74,12 +76,12 @@ class CandidateListChangeListener implements DOMDataTreeChangeListener { YangInstanceIdentifier entityId = extractEntityPath(change.getRootPath()); if(changeRoot.getModificationType() == ModificationType.WRITE) { - LOG.debug("Candidate {} was added for entity {}", candidate, entityId); + LOG.debug("{}: Candidate {} was added for entity {}", logId, candidate, entityId); Collection currentCandidates = addToCurrentCandidates(entityId, candidate); shard.tell(new CandidateAdded(entityId, candidate, new ArrayList<>(currentCandidates)), shard); } else if(changeRoot.getModificationType() == ModificationType.DELETE) { - LOG.debug("Candidate {} was removed for entity {}", candidate, entityId); + LOG.debug("{}: Candidate {} was removed for entity {}", logId, candidate, entityId); Collection currentCandidates = removeFromCurrentCandidates(entityId, candidate); shard.tell(new CandidateRemoved(entityId, candidate, new ArrayList<>(currentCandidates)), shard); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListener.java index 9c551f2576..cf9df1821a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListener.java @@ -7,25 +7,18 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; -import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH; -import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME; -import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity; import com.google.common.base.Objects; import com.google.common.base.Optional; import java.util.Collection; -import java.util.Map.Entry; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; @@ -50,27 +43,26 @@ class EntityOwnerChangeListener implements DOMDataTreeChangeListener { void init(ShardDataTree shardDataTree) { shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH). - node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME).build(), this); + node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME).node(EntityOwnersModel.ENTITY_OWNER_QNAME).build(), this); } @Override public void onDataTreeChanged(Collection changes) { for(DataTreeCandidate change: changes) { DataTreeCandidateNode changeRoot = change.getRootNode(); - MapEntryNode entityNode = (MapEntryNode) changeRoot.getDataAfter().get(); + LeafNode ownerLeaf = (LeafNode) changeRoot.getDataAfter().get(); - LOG.debug("Entity node changed: {}, {}", changeRoot.getModificationType(), change.getRootPath()); + LOG.debug("{}: Entity node changed: {}, {}", logId(), changeRoot.getModificationType(), change.getRootPath()); - String newOwner = extractOwner(entityNode); + String newOwner = extractOwner(ownerLeaf); String origOwner = null; Optional> dataBefore = changeRoot.getDataBefore(); if(dataBefore.isPresent()) { - MapEntryNode origEntityNode = (MapEntryNode) changeRoot.getDataBefore().get(); - origOwner = extractOwner(origEntityNode); + origOwner = extractOwner((LeafNode) changeRoot.getDataBefore().get()); } - LOG.debug("New owner: {}, Original owner: {}", newOwner, origOwner); + LOG.debug("{}: New owner: {}, Original owner: {}", logId(), newOwner, origOwner); if(!Objects.equal(origOwner, newOwner)) { boolean isOwner = Objects.equal(localMemberName, newOwner); @@ -78,8 +70,8 @@ class EntityOwnerChangeListener implements DOMDataTreeChangeListener { if(isOwner || wasOwner) { Entity entity = createEntity(change.getRootPath()); - LOG.debug("Calling notifyEntityOwnershipListeners: entity: {}, wasOwner: {}, isOwner: {}", - entity, wasOwner, isOwner); + LOG.debug("{}: Calling notifyEntityOwnershipListeners: entity: {}, wasOwner: {}, isOwner: {}", + logId(), entity, wasOwner, isOwner); listenerSupport.notifyEntityOwnershipListeners(entity, wasOwner, isOwner); } @@ -87,26 +79,12 @@ class EntityOwnerChangeListener implements DOMDataTreeChangeListener { } } - private Entity createEntity(YangInstanceIdentifier entityPath) { - String entityType = null; - YangInstanceIdentifier entityId = null; - for(PathArgument pathArg: entityPath.getPathArguments()) { - if(pathArg instanceof NodeIdentifierWithPredicates) { - NodeIdentifierWithPredicates nodeKey = (NodeIdentifierWithPredicates) pathArg; - Entry key = nodeKey.getKeyValues().entrySet().iterator().next(); - if(ENTITY_TYPE_QNAME.equals(key.getKey())) { - entityType = key.getValue().toString(); - } else if(ENTITY_ID_QNAME.equals(key.getKey())) { - entityId = (YangInstanceIdentifier) key.getValue(); - } - } - } - - return new Entity(entityType, entityId); + private String extractOwner(LeafNode ownerLeaf) { + Object value = ownerLeaf.getValue(); + return value != null ? value.toString() : null; } - private String extractOwner(MapEntryNode entityNode) { - Optional> ownerNode = entityNode.getChild(ENTITY_OWNER_NODE_ID); - return ownerNode.isPresent() ? (String) ownerNode.get().getValue() : null; + private String logId() { + return listenerSupport.getLogId(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java index 385bb70649..1c97b4818b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java @@ -7,12 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import java.util.Map.Entry; +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; @@ -61,6 +65,15 @@ final class EntityOwnersModel { } + static YangInstanceIdentifier candidatePath(YangInstanceIdentifier entityPath, String candidateName) { + return YangInstanceIdentifier.builder(entityPath).node(Candidate.QNAME).nodeWithKey( + Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName).build(); + } + + static NodeIdentifierWithPredicates candidateNodeKey(String candidateName) { + return new NodeIdentifierWithPredicates(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName); + } + static NormalizedNode entityOwnersWithCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName) { return entityOwnersWithEntityTypeEntry(entityTypeEntryWithEntityEntry(entityType, @@ -86,11 +99,33 @@ final class EntityOwnersModel { static MapNode candidateEntry(String candidateName) { return ImmutableOrderedMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(Candidate.QNAME)). - addChild(ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName)).build(); + addChild(candidateMapEntry(candidateName)).build(); + } + + static MapEntryNode candidateMapEntry(String candidateName) { + return ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName); } static NormalizedNode entityEntryWithOwner(YangInstanceIdentifier entityId, String owner) { return ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID_QNAME, entityId).addChild( ImmutableNodes.leafNode(ENTITY_OWNER_QNAME, owner)).build(); } + + static Entity createEntity(YangInstanceIdentifier entityPath) { + String entityType = null; + YangInstanceIdentifier entityId = null; + for(PathArgument pathArg: entityPath.getPathArguments()) { + if(pathArg instanceof NodeIdentifierWithPredicates) { + NodeIdentifierWithPredicates nodeKey = (NodeIdentifierWithPredicates) pathArg; + Entry key = nodeKey.getKeyValues().entrySet().iterator().next(); + if(ENTITY_TYPE_QNAME.equals(key.getKey())) { + entityType = key.getValue().toString(); + } else if(ENTITY_ID_QNAME.equals(key.getKey())) { + entityId = (YangInstanceIdentifier) key.getValue(); + } + } + } + + return new Entity(entityType, entityId); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java index 7941bc088c..ed4a004ed0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java @@ -18,6 +18,7 @@ import java.util.IdentityHashMap; import java.util.Map; import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,35 +31,51 @@ import org.slf4j.LoggerFactory; class EntityOwnershipListenerSupport { private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class); + private final String logId; private final ActorContext actorContext; private final Map listenerActorMap = new IdentityHashMap<>(); private final Multimap entityListenerMap = HashMultimap.create(); private final Multimap entityTypeListenerMap = HashMultimap.create(); - EntityOwnershipListenerSupport(ActorContext actorContext) { + EntityOwnershipListenerSupport(ActorContext actorContext, String logId) { this.actorContext = actorContext; + this.logId = logId; + } + + String getLogId() { + return logId; + } + + boolean hasCandidateForEntity(Entity entity) { + for(EntityOwnershipListener listener: entityListenerMap.get(entity)) { + if(listener instanceof EntityOwnershipCandidate) { + return true; + } + } + + return false; } void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) { - LOG.debug("Adding EntityOwnershipListener {} for {}", listener, entity); + LOG.debug("{}: Adding EntityOwnershipListener {} for {}", logId, listener, entity); addListener(listener, entity, entityListenerMap); } void addEntityOwnershipListener(String entityType, EntityOwnershipListener listener) { - LOG.debug("Adding EntityOwnershipListener {} for entity type {}", listener, entityType); + LOG.debug("{}: Adding EntityOwnershipListener {} for entity type {}", logId, listener, entityType); addListener(listener, entityType, entityTypeListenerMap); } void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) { - LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity); + LOG.debug("{}: Removing EntityOwnershipListener {} for {}", logId, listener, entity); removeListener(listener, entity, entityListenerMap); } void removeEntityOwnershipListener(String entityType, EntityOwnershipListener listener) { - LOG.debug("Removing EntityOwnershipListener {} for entity type {}", listener, entityType); + LOG.debug("{}: Removing EntityOwnershipListener {} for entity type {}", logId, listener, entityType); removeListener(listener, entityType, entityTypeListenerMap); } @@ -87,7 +104,7 @@ class EntityOwnershipListenerSupport { for(EntityOwnershipListener listener: listeners) { ActorRef listenerActor = listenerActorFor(listener); - LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor, changed); + LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed); listenerActor.tell(changed, ActorRef.noSender()); } @@ -110,7 +127,7 @@ class EntityOwnershipListenerSupport { if(fromListenerMap.remove(mapKey, listener)) { ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener); - LOG.debug("Found {}", listenerEntry); + LOG.debug("{}: Found {}", logId, listenerEntry); listenerEntry.referenceCount--; if(listenerEntry.referenceCount <= 0) { @@ -136,7 +153,7 @@ class EntityOwnershipListenerSupport { if(actorRef == null) { actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener)); - LOG.debug("Created EntityOwnershipListenerActor {} for listener {}", actorRef, listener); + LOG.debug("{}: Created EntityOwnershipListenerActor {} for listener {}", logId, actorRef, listener); } return actorRef; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 4dfbc87eb9..58a7a03675 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -10,13 +10,18 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateMapEntry; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -80,7 +85,7 @@ class EntityOwnershipShard extends Shard { super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext); this.localMemberName = localMemberName; this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG); - this.listenerSupport = new EntityOwnershipListenerSupport(getContext()); + this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId()); for(String peerId: peerAddresses.keySet()) { ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build(); @@ -97,7 +102,7 @@ class EntityOwnershipShard extends Shard { protected void onRecoveryComplete() { super.onRecoveryComplete(); - new CandidateListChangeListener(getSelf()).init(getDataStore()); + new CandidateListChangeListener(getSelf(), persistenceId()).init(getDataStore()); new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore()); } @@ -236,15 +241,28 @@ class EntityOwnershipShard extends Shard { } private void onCandidateRemoved(CandidateRemoved message) { - if(!isLeader()) { - return; - } - LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message); - String currentOwner = getCurrentOwner(message.getEntityPath()); - if(message.getRemovedCandidate().equals(currentOwner)){ - writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates())); + if(isLeader()) { + String currentOwner = getCurrentOwner(message.getEntityPath()); + if(message.getRemovedCandidate().equals(currentOwner)){ + writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates())); + } + } else { + // We're not the leader. If the removed candidate is our local member then check if we actually + // have a local candidate registered. If we do then we must have been partitioned from the leader + // and the leader removed our candidate since the leader can't tell the difference between a + // temporary network partition and a node's process actually restarted. So, in that case, re-add + // our candidate. + if(localMemberName.equals(message.getRemovedCandidate()) && + listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) { + LOG.debug("Local candidate member was removed but a local candidate is registered for {}" + + " - adding back local candidate", message.getEntityPath()); + + commitCoordinator.commitModification(new MergeModification( + candidatePath(message.getEntityPath(), localMemberName), + candidateMapEntry(localMemberName)), this); + } } } @@ -255,6 +273,10 @@ class EntityOwnershipShard extends Shard { LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message); + // Since a node's candidate member is only added by the node itself, we can assume the node is up so + // remove it from the downPeerMemberNames. + downPeerMemberNames.remove(message.getNewCandidate()); + String currentOwner = getCurrentOwner(message.getEntityPath()); if(Strings.isNullOrEmpty(currentOwner)){ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates())); @@ -262,12 +284,12 @@ class EntityOwnershipShard extends Shard { } private void onPeerDown(PeerDown peerDown) { - LOG.debug("{}: onPeerDown: {}", persistenceId(), peerDown); + LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown); String downMemberName = peerDown.getMemberName(); if(downPeerMemberNames.add(downMemberName) && isLeader()) { - // Select new owners for entities owned by the down peer. - selectNewOwnerForEntitiesOwnedBy(downMemberName); + // Remove the down peer as a candidate from all entities. + removeCandidateFromEntities(downMemberName); } } @@ -275,13 +297,7 @@ class EntityOwnershipShard extends Shard { LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp); peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName()); - - if(downPeerMemberNames.remove(peerUp.getMemberName()) && isLeader()) { - // This peer was previously down - for its previously owned entities, if there were no other - // candidates, the owner would have been cleared so handle that here by trying to re-assign - // ownership for entities whose owner is cleared. - selectNewOwnerForEntitiesOwnedBy(""); - } + downPeerMemberNames.remove(peerUp.getMemberName()); } private void selectNewOwnerForEntitiesOwnedBy(String owner) { @@ -304,7 +320,34 @@ class EntityOwnershipShard extends Shard { commitCoordinator.commitModifications(modifications, this); } - private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) { + private void removeCandidateFromEntities(final String owner) { + final BatchedModifications modifications = commitCoordinator.newBatchedModifications(); + searchForEntities(new EntityWalker() { + @Override + public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) { + if(hasCandidate(entityNode, owner)) { + YangInstanceIdentifier entityId = + (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME); + YangInstanceIdentifier candidatePath = candidatePath( + entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(), + entityId, owner); + + LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId, + owner, candidatePath); + + modifications.addModification(new DeleteModification(candidatePath)); + } + } + }); + + commitCoordinator.commitModifications(modifications, this); + } + + private boolean hasCandidate(MapEntryNode entity, String candidateName) { + return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName)).isPresent(); + } + + private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) { DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot(); Optional> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH); if(!possibleEntityTypes.isPresent()) { @@ -313,6 +356,25 @@ class EntityOwnershipShard extends Shard { LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner); + searchForEntities(new EntityWalker() { + @Override + public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) { + Optional> possibleOwner = + entityNode.getChild(ENTITY_OWNER_NODE_ID); + if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) { + walker.onEntity(entityTypeNode, entityNode); + } + } + }); + } + + private void searchForEntities(EntityWalker walker) { + DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot(); + Optional> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH); + if(!possibleEntityTypes.isPresent()) { + return; + } + for(MapEntryNode entityType: ((MapNode) possibleEntityTypes.get()).getValue()) { Optional> possibleEntities = entityType.getChild(ENTITY_NODE_ID); @@ -321,11 +383,7 @@ class EntityOwnershipShard extends Shard { } for(MapEntryNode entity: ((MapNode) possibleEntities.get()).getValue()) { - Optional> possibleOwner = - entity.getChild(ENTITY_OWNER_NODE_ID); - if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) { - walker.onEntity(entityType, entity); - } + walker.onEntity(entityType, entity); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java index 09cae67e7d..3134a6eedd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java @@ -51,7 +51,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed */ public class AbstractEntityOwnershipTest extends AbstractActorTest { protected void verifyEntityCandidate(NormalizedNode node, String entityType, - YangInstanceIdentifier entityId, String candidateName) { + YangInstanceIdentifier entityId, String candidateName, boolean expectPresent) { try { assertNotNull("Missing " + EntityOwners.QNAME.toString(), node); assertTrue(node instanceof ContainerNode); @@ -59,24 +59,25 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest { ContainerNode entityOwnersNode = (ContainerNode) node; MapEntryNode entityTypeEntry = getMapEntryNodeChild(entityOwnersNode, EntityType.QNAME, - ENTITY_TYPE_QNAME, entityType); + ENTITY_TYPE_QNAME, entityType, true); - MapEntryNode entityEntry = getMapEntryNodeChild(entityTypeEntry, ENTITY_QNAME, ENTITY_ID_QNAME, entityId); + MapEntryNode entityEntry = getMapEntryNodeChild(entityTypeEntry, ENTITY_QNAME, ENTITY_ID_QNAME, + entityId, true); - getMapEntryNodeChild(entityEntry, Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName); + getMapEntryNodeChild(entityEntry, Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName, expectPresent); } catch(AssertionError e) { throw new AssertionError("Verification of entity candidate failed - returned data was: " + node, e); } } protected void verifyEntityCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName, - Function> reader) { + Function> reader, boolean expectPresent) { AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) { NormalizedNode node = reader.apply(ENTITY_OWNERS_PATH); try { - verifyEntityCandidate(node, entityType, entityId, candidateName); + verifyEntityCandidate(node, entityType, entityId, candidateName, expectPresent); return; } catch (AssertionError e) { lastError = e; @@ -87,8 +88,13 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest { throw lastError; } + protected void verifyEntityCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName, + Function> reader) { + verifyEntityCandidate(entityType, entityId, candidateName, reader, true); + } + protected MapEntryNode getMapEntryNodeChild(DataContainerNode parent, QName childMap, - QName child, Object key) { + QName child, Object key, boolean expectPresent) { Optional> childNode = parent.getChild(new NodeIdentifier(childMap)); assertEquals("Missing " + childMap.toString(), true, childNode.isPresent()); @@ -96,10 +102,13 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest { MapNode entityTypeMapNode = (MapNode) childNode.get(); Optional entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates( childMap, child, key)); - if(!entityTypeEntry.isPresent()) { + if(expectPresent && !entityTypeEntry.isPresent()) { fail("Missing " + childMap.toString() + " entry for " + key + ". Actual: " + entityTypeMapNode.getValue()); + } else if(!expectPresent && entityTypeEntry.isPresent()) { + fail("Found unexpected " + childMap.toString() + " entry for " + key); } - return entityTypeEntry.get(); + + return entityTypeEntry.isPresent() ? entityTypeEntry.get() : null; } static void verifyOwner(String expected, String entityType, YangInstanceIdentifier entityId, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java index aa96fc20d8..7516242223 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java @@ -45,7 +45,7 @@ public class CandidateListChangeListenerTest extends AbstractActorTest { public void testOnDataTreeChanged() throws Exception { JavaTestKit kit = new JavaTestKit(getSystem()); - new CandidateListChangeListener(kit.getRef()).init(shardDataTree); + new CandidateListChangeListener(kit.getRef(), "test").init(shardDataTree); String memberName1 = "member-1"; writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName1)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java index 9bd0ccf2c0..8b80d6b9e7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java @@ -26,6 +26,7 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +; /** * Unit tests for EntityOwnerChangeListener. @@ -85,9 +86,21 @@ public class EntityOwnerChangeListenerTest { reset(mockListenerSupport); writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, LOCAL_MEMBER_NAME)); verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY2, false, true); + + reset(mockListenerSupport); + writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, LOCAL_MEMBER_NAME)); + verify(mockListenerSupport, never()).notifyEntityOwnershipListeners(any(Entity.class), anyBoolean(), anyBoolean()); + + reset(mockListenerSupport); + writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, null)); + verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY2, true, false); } private void writeNode(YangInstanceIdentifier path, NormalizedNode node) throws DataValidationFailedException { AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree); } + + private void deleteNode(YangInstanceIdentifier path) throws DataValidationFailedException { + AbstractEntityOwnershipTest.deleteNode(path, shardDataTree); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java index 8ddc0b473e..00448751da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java @@ -15,9 +15,9 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import akka.actor.ActorContext; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActorContext; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.util.concurrent.Uninterruptibles; @@ -25,11 +25,13 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -43,6 +45,15 @@ import scala.collection.immutable.Iterable; */ public class EntityOwnershipListenerSupportTest extends AbstractActorTest { private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + private ActorContext actorContext; + + @Before + public void setup() { + TestActorRef actor = actorFactory.createTestActor( + Props.create(DoNothingActor.class), actorFactory.generateActorId("test")); + + actorContext = actor.underlyingActor().getContext(); + } @After public void tearDown() { @@ -51,11 +62,7 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest { @Test public void testNotifyEntityOwnershipListeners() { - TestActorRef actor = actorFactory.createTestActor( - Props.create(DoNothingActor.class), actorFactory.generateActorId("test")); - - UntypedActorContext actorContext = actor.underlyingActor().getContext(); - EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext); + EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext, "test"); EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1"); EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2"); @@ -184,4 +191,22 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest { support.addEntityOwnershipListener(entity1, mockListener2); support.removeEntityOwnershipListener(entity1, mockListener2); } + + @Test + public void testHasCandidateForEntity() { + EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext, "test"); + Entity entity = new Entity("type", YangInstanceIdentifier.of(QName.create("test", "id"))); + + assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity)); + + support.addEntityOwnershipListener(entity, mock(EntityOwnershipListener.class)); + assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity)); + + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + support.addEntityOwnershipListener(entity, candidate); + assertEquals("hasCandidateForEntity", true, support.hasCandidateForEntity(entity)); + + support.removeEntityOwnershipListener(entity, candidate); + assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity)); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index 146916a962..2dc754472e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -93,6 +93,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3")); private static final YangInstanceIdentifier ENTITY_ID4 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4")); + private static final YangInstanceIdentifier ENTITY_ID5 = + YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5")); private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners(); private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger(); private static final String LOCAL_MEMBER_NAME = "member-1"; @@ -480,10 +482,6 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit); verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); - - leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); - kit.expectMsgClass(SuccessReply.class); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); // Add candidates for entity3 with peerMember2 as the owner. @@ -505,6 +503,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + // Add only candidate peerMember1 for entity5. + + commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1); + // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new // owner selected @@ -523,12 +527,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); - // Reinstate peerMember2 - should become owner again for entity 4 + // Reinstate peerMember2 - no owners should change peer2 = actorFactory.createTestActor(newShardProps(peerId2, ImmutableMap.builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(), @@ -538,10 +542,22 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender()); peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender()); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); + + // Add back candidate peerMember2 for entities 1, 2, & 3. + + commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit); + commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit); + commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected @@ -552,7 +568,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Verify the reinstated peerMember2 is fully synced. - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, ""); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); @@ -564,14 +580,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString()); leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender()); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); // Verify the reinstated peerMember1 is fully synced. - verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, ""); verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); @@ -589,12 +605,67 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { kit.waitUntilLeader(peer2); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, ""); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); } + @Test + public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception { + ShardTestKit kit = new ShardTestKit(getSystem()); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000); + ShardIdentifier leaderId = newShardId("leader"); + ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME); + + TestActorRef shard = actorFactory.createTestActor(Props.create( + TestEntityOwnershipShard.class, localId, + ImmutableMap.builder().put(leaderId.toString(), "".toString()).build(), + dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId())); + + TestActorRef leader = actorFactory.createTestActor(newShardProps(leaderId, + ImmutableMap.builder().put(localId.toString(), shard.path().toString()).build(), + LOCAL_MEMBER_NAME).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString()); + leader.tell(new ElectionTimeout(), leader); + + kit.waitUntilLeader(leader); + + shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender()); + + Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1); + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + + // Register local candidate + + shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME); + verify(candidate, timeout(5000)).ownershipChanged(entity, false, true); + reset(candidate); + + // Simulate a replicated commit from the leader to remove the local candidate that would occur after a + // network partition is healed. + + leader.tell(new PeerDown(LOCAL_MEMBER_NAME, localId.toString()), ActorRef.noSender()); + + verify(candidate, timeout(5000)).ownershipChanged(entity, true, false); + + // Since the the shard has a local candidate registered, it should re-add its candidate to the entity. + + verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME); + verify(candidate, timeout(5000)).ownershipChanged(entity, false, true); + + // Unregister the local candidate and verify it's removed and no re-added. + + shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME); + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME); + } + @Test public void testListenerRegistration() throws Exception { ShardTestKit kit = new ShardTestKit(getSystem()); @@ -681,16 +752,21 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { private void commitModification(TestActorRef shard, NormalizedNode node, JavaTestKit sender) { - BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, ""); - modifications.setDoCommitOnReady(true); - modifications.setReady(true); - modifications.setTotalMessagesSent(1); + BatchedModifications modifications = newBatchedModifications(); modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node)); shard.tell(modifications, sender.getRef()); sender.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS); } + private BatchedModifications newBatchedModifications() { + BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, ""); + modifications.setDoCommitOnReady(true); + modifications.setReady(true); + modifications.setTotalMessagesSent(1); + return modifications; + } + private void verifyEntityCandidateRemoved(final TestActorRef shard, String entityType, YangInstanceIdentifier entityId, String candidateName) { verifyNodeRemoved(candidatePath(entityType, entityId, candidateName), @@ -720,6 +796,20 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { }); } + private void verifyNoEntityCandidate(final TestActorRef shard, String entityType, + YangInstanceIdentifier entityId, String candidateName) { + verifyEntityCandidate(entityType, entityId, candidateName, new Function>() { + @Override + public NormalizedNode apply(YangInstanceIdentifier path) { + try { + return AbstractShardTest.readStore(shard, path); + } catch(Exception e) { + throw new AssertionError("Failed to read " + path, e); + } + } + }, false); + } + private void verifyBatchedEntityCandidate(List mods, String entityType, YangInstanceIdentifier entityId, String candidateName) throws Exception { assertEquals("BatchedModifications size", 1, mods.size()); @@ -730,7 +820,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { YangInstanceIdentifier entityId, String candidateName) throws Exception { assertEquals("Modification type", MergeModification.class, mod.getClass()); verifyEntityCandidate(((MergeModification)mod).getData(), entityType, - entityId, candidateName); + entityId, candidateName, true); } private void verifyOwner(final TestActorRef shard, String entityType, YangInstanceIdentifier entityId,