Bug 4105: Choose Owner for an Entity based on first come first served basis 98/26798/1
authorMoiz Raja <moraja@cisco.com>
Wed, 19 Aug 2015 23:53:10 +0000 (16:53 -0700)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 19:14:39 +0000 (15:14 -0400)
Change-Id: If40e19cf40e832c9317611bde2950502f7f4897c
Signed-off-by: Moiz Raja <moraja@cisco.com>
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/CandidateAdded.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/CandidateRemoved.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java
new file mode 100644 (file)
index 0000000..35100cb
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for candidate entries added/removed and notifies the EntityOwnershipShard appropriately.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
+public class CandidateListChangeListener implements DOMDataTreeChangeListener {
+    private static final Logger LOG = LoggerFactory.getLogger(CandidateListChangeListener.class);
+
+    private final ActorRef shard;
+    private final Map<YangInstanceIdentifier, Collection<String>> currentCandidates = new HashMap<>();
+
+    public CandidateListChangeListener(ActorRef shard, ShardDataTree shardDataTree) {
+        this.shard = Preconditions.checkNotNull(shard, "shard should not be null");
+
+        shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH).
+                node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME).
+                        node(Candidate.QNAME).node(Candidate.QNAME).build(), this);
+    }
+
+    @Override
+    public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+        for(DataTreeCandidate change: changes) {
+            DataTreeCandidateNode changeRoot = change.getRootNode();
+
+            LOG.debug("Candidate node changed: {}, {}", changeRoot.getModificationType(), change.getRootPath());
+
+            NodeIdentifierWithPredicates candidateKey =
+                    (NodeIdentifierWithPredicates) change.getRootPath().getLastPathArgument();
+            String candidate = candidateKey.getKeyValues().get(CANDIDATE_NAME_QNAME).toString();
+
+            YangInstanceIdentifier entityId = extractEntityPath(change.getRootPath());
+
+            if(changeRoot.getModificationType() == ModificationType.WRITE) {
+                LOG.debug("Candidate {} was added for entity {}", candidate, entityId);
+
+                Collection<String> currentCandidates = addToCurrentCandidates(entityId, candidate);
+                shard.tell(new CandidateAdded(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
+            } else if(changeRoot.getModificationType() == ModificationType.DELETE) {
+                LOG.debug("Candidate {} was removed for entity {}", candidate, entityId);
+
+                Collection<String> currentCandidates = removeFromCurrentCandidates(entityId, candidate);
+                shard.tell(new CandidateRemoved(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
+            }
+        }
+    }
+
+    private Collection<String> addToCurrentCandidates(YangInstanceIdentifier entityId, String newCandidate) {
+        Collection<String> candidates = currentCandidates.get(entityId);
+        if(candidates == null) {
+            candidates = new LinkedHashSet<>();
+            currentCandidates.put(entityId, candidates);
+        }
+
+        candidates.add(newCandidate);
+        return candidates;
+    }
+
+    private Collection<String> removeFromCurrentCandidates(YangInstanceIdentifier entityId, String candidateToRemove) {
+        Collection<String> candidates = currentCandidates.get(entityId);
+        if(candidates != null) {
+            candidates.remove(candidateToRemove);
+            return candidates;
+        }
+
+        // Shouldn't happen
+        return Collections.emptyList();
+    }
+
+    private YangInstanceIdentifier extractEntityPath(YangInstanceIdentifier candidatePath) {
+        List<PathArgument> newPathArgs = new ArrayList<>();
+        for(PathArgument pathArg: candidatePath.getPathArguments()) {
+            newPathArgs.add(pathArg);
+            if(pathArg instanceof NodeIdentifierWithPredicates) {
+                NodeIdentifierWithPredicates nodeKey = (NodeIdentifierWithPredicates) pathArg;
+                Entry<QName, Object> key = nodeKey.getKeyValues().entrySet().iterator().next();
+                if(ENTITY_ID_QNAME.equals(key.getKey())) {
+                    break;
+                }
+            }
+        }
+
+        return YangInstanceIdentifier.create(newPathArgs);
+    }
+}
\ No newline at end of file
index bf26163..253761f 100644 (file)
@@ -32,8 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A DataChangeListener that listeners for entity owner changes and notifies the EntityOwnershipListenerSupport
- * appropriately.
+ * Listens for entity owner changes and notifies the EntityOwnershipListenerSupport appropriately.
  *
  * @author Thomas Pantelis
  */
index 5795007..46f3358 100644 (file)
@@ -44,6 +44,15 @@ final class EntityOwnersModel {
 
     }
 
+    static YangInstanceIdentifier candidatePath(String entityType, YangInstanceIdentifier entityId,
+            String candidateName) {
+        return YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH).node(EntityType.QNAME).
+                nodeWithKey(EntityType.QNAME, ENTITY_TYPE_QNAME, entityType).node(ENTITY_QNAME).
+                        nodeWithKey(ENTITY_QNAME, ENTITY_ID_QNAME, entityId).node(Candidate.QNAME).
+                                nodeWithKey(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName).build();
+
+    }
+
     static NormalizedNode<?, ?> entityOwnersWithCandidate(String entityType, YangInstanceIdentifier entityId,
             String candidateName) {
         return entityOwnersWithEntityTypeEntry(entityTypeEntryWithEntityEntry(entityType,
index c95ea62..629f938 100644 (file)
@@ -8,22 +8,32 @@
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.pattern.Patterns;
+import com.google.common.base.Optional;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Future;
 
@@ -52,12 +62,23 @@ class EntityOwnershipShard extends Shard {
         super.onDatastoreContext(noPersistenceDatastoreContext(context));
     }
 
+    @Override
+    protected void onRecoveryComplete() {
+        super.onRecoveryComplete();
+
+        new CandidateListChangeListener(getSelf(), getDataStore());
+    }
+
     @Override
     public void onReceiveCommand(final Object message) throws Exception {
         if(message instanceof RegisterCandidateLocal) {
             onRegisterCandidateLocal((RegisterCandidateLocal)message);
         } else if(message instanceof UnregisterCandidateLocal) {
             onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
+        } else if(message instanceof CandidateAdded){
+            onCandidateAdded((CandidateAdded) message);
+        } else if(message instanceof CandidateRemoved){
+            onCandidateRemoved((CandidateRemoved) message);
         } else if(!commitCoordinator.handleMessage(message, this)) {
             super.onReceiveCommand(message);
         }
@@ -113,6 +134,56 @@ class EntityOwnershipShard extends Shard {
         getSender().tell(SuccessReply.INSTANCE, getSelf());
     }
 
+    private void onCandidateRemoved(CandidateRemoved message) {
+        if(!isLeader()){
+            return;
+        }
+
+        LOG.debug("onCandidateRemoved: {}", message);
+
+        String currentOwner = getCurrentOwner(message.getEntityPath());
+        if(message.getRemovedCandidate().equals(currentOwner)){
+            writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+        }
+    }
+
+    private void onCandidateAdded(CandidateAdded message) {
+        if(!isLeader()){
+            return;
+        }
+
+        LOG.debug("onCandidateAdded: {}", message);
+
+        String currentOwner = getCurrentOwner(message.getEntityPath());
+        if(currentOwner == null){
+            writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
+        }
+    }
+
+    private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
+        LOG.debug("Writing new owner {} for entity {}", newOwner, entityPath);
+
+        commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
+                ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
+    }
+
+    private String newOwner(Collection<String> candidates) {
+        if(candidates.size() > 0){
+            return candidates.iterator().next();
+        }
+
+        return "";
+    }
+
+    private String getCurrentOwner(YangInstanceIdentifier entityId) {
+        DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+        Optional<NormalizedNode<?, ?>> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME));
+        if(optionalEntityOwner.isPresent()){
+            return optionalEntityOwner.get().getValue().toString();
+        }
+        return null;
+    }
+
     public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
         return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/CandidateAdded.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/CandidateAdded.java
new file mode 100644 (file)
index 0000000..6340880
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Message sent when a new candidate is added for an entity.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
+public class CandidateAdded {
+    private final YangInstanceIdentifier entityPath;
+    private final Collection<String> allCandidates;
+    private final String newCandidate;
+
+    public CandidateAdded(YangInstanceIdentifier entityPath, String newCandidate, Collection<String> allCandidates) {
+        this.entityPath = entityPath;
+        this.newCandidate = newCandidate;
+        this.allCandidates = allCandidates;
+    }
+
+    public YangInstanceIdentifier getEntityPath() {
+        return entityPath;
+    }
+
+    public Collection<String> getAllCandidates() {
+        return allCandidates;
+    }
+
+    public String getNewCandidate() {
+        return newCandidate;
+    }
+
+    @Override
+    public String toString() {
+        return "CandidateAdded [entityPath=" + entityPath + ", newCandidate=" + newCandidate + ", allCandidates="
+                + allCandidates + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/CandidateRemoved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/CandidateRemoved.java
new file mode 100644 (file)
index 0000000..8a96a76
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Message sent when a candidate is removed for an entity.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
+public class CandidateRemoved {
+    private final YangInstanceIdentifier entityPath;
+    private final String removedCandidate;
+    private final Collection<String> remainingCandidates;
+
+    public CandidateRemoved(YangInstanceIdentifier entityPath, String removedCandidate, Collection<String> remainingCandidates) {
+        this.entityPath = entityPath;
+        this.removedCandidate = removedCandidate;
+        this.remainingCandidates = remainingCandidates;
+    }
+
+    public YangInstanceIdentifier getEntityPath() {
+        return entityPath;
+    }
+
+    public String getRemovedCandidate() {
+        return removedCandidate;
+    }
+
+    public Collection<String> getRemainingCandidates() {
+        return remainingCandidates;
+    }
+
+    @Override
+    public String toString() {
+        return "CandidateRemoved [entityPath=" + entityPath + ", removedCandidate=" + removedCandidate
+                + ", remainingCandidates=" + remainingCandidates + "]";
+    }
+}
index c6ef275..0e282fb 100644 (file)
@@ -13,10 +13,18 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
@@ -31,6 +39,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 
 /**
  * Abstract base class providing utility methods.
@@ -53,7 +64,7 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
 
             getMapEntryNodeChild(entityEntry, Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName);
         } catch(AssertionError e) {
-            throw new AssertionError("Verification of enitity candidate failed - returned data was: " + node, e);
+            throw new AssertionError("Verification of entity candidate failed - returned data was: " + node, e);
         }
     }
 
@@ -71,4 +82,45 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
         }
         return entityTypeEntry.get();
     }
+
+    protected void verifyOwner(String expected, String entityType, YangInstanceIdentifier entityId,
+            Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) {
+        YangInstanceIdentifier entityPath = entityPath(entityType, entityId).node(ENTITY_OWNER_QNAME);
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+            NormalizedNode<?, ?> node = reader.apply(entityPath);
+            if(node != null) {
+                Assert.assertEquals("Entity owner", expected, node.getValue().toString());
+                return;
+            } else {
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        fail("Owner was not set for entityId: " + entityId);
+    }
+
+    static void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node, ShardDataTree shardDataTree)
+            throws DataValidationFailedException {
+        DataTreeModification modification = shardDataTree.getDataTree().takeSnapshot().newModification();
+        modification.merge(path, node);
+        commit(shardDataTree, modification);
+    }
+
+    static void deleteNode(YangInstanceIdentifier path, ShardDataTree shardDataTree)
+            throws DataValidationFailedException {
+        DataTreeModification modification = shardDataTree.getDataTree().takeSnapshot().newModification();
+        modification.delete(path);
+        commit(shardDataTree, modification);
+    }
+
+    static void commit(ShardDataTree shardDataTree, DataTreeModification modification)
+            throws DataValidationFailedException {
+        modification.ready();
+
+        shardDataTree.getDataTree().validate(modification);
+        DataTreeCandidateTip candidate = shardDataTree.getDataTree().prepare(modification);
+        shardDataTree.getDataTree().commit(candidate);
+        shardDataTree.notifyListeners(candidate);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java
new file mode 100644 (file)
index 0000000..149d5f4
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableSet;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Unit tests for CandidateListChangeListener.
+ *
+ * @author Thomas Pantelis
+ */
+public class CandidateListChangeListenerTest extends AbstractActorTest {
+    private static final String ENTITY_TYPE = "test";
+    private static final YangInstanceIdentifier ENTITY_ID1 =
+            YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
+    private static final YangInstanceIdentifier ENTITY_ID2 =
+            YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
+
+    private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners());
+
+    @Test
+    public void testOnDataTreeChanged() throws Exception {
+        JavaTestKit kit = new JavaTestKit(getSystem());
+
+        CandidateListChangeListener listener = new CandidateListChangeListener(kit.getRef(), shardDataTree);
+
+        String memberName1 = "member-1";
+        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName1));
+
+        CandidateAdded candidateAdded = kit.expectMsgClass(CandidateAdded.class);
+        assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID1), candidateAdded.getEntityPath());
+        assertEquals("getNewCandidate", memberName1, candidateAdded.getNewCandidate());
+        assertEquals("getAllCandidates", ImmutableSet.of(memberName1),
+                ImmutableSet.copyOf(candidateAdded.getAllCandidates()));
+
+        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName1));
+        kit.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
+
+        String memberName2 = "member-2";
+        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName2));
+
+        candidateAdded = kit.expectMsgClass(CandidateAdded.class);
+        assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID1), candidateAdded.getEntityPath());
+        assertEquals("getNewCandidate", memberName2, candidateAdded.getNewCandidate());
+        assertEquals("getAllCandidates", ImmutableSet.of(memberName1, memberName2),
+                ImmutableSet.copyOf(candidateAdded.getAllCandidates()));
+
+        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, memberName1));
+
+        candidateAdded = kit.expectMsgClass(CandidateAdded.class);
+        assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID2), candidateAdded.getEntityPath());
+        assertEquals("getNewCandidate", memberName1, candidateAdded.getNewCandidate());
+        assertEquals("getAllCandidates", ImmutableSet.of(memberName1),
+                ImmutableSet.copyOf(candidateAdded.getAllCandidates()));
+
+        deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, memberName1));
+
+        CandidateRemoved candidateRemoved = kit.expectMsgClass(CandidateRemoved.class);
+        assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID1), candidateRemoved.getEntityPath());
+        assertEquals("getRemovedCandidate", memberName1, candidateRemoved.getRemovedCandidate());
+        assertEquals("getRemainingCandidates", ImmutableSet.of(memberName2),
+                ImmutableSet.copyOf(candidateRemoved.getRemainingCandidates()));
+
+        deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, memberName2));
+
+        candidateRemoved = kit.expectMsgClass(CandidateRemoved.class);
+        assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID1), candidateRemoved.getEntityPath());
+        assertEquals("getRemovedCandidate", memberName2, candidateRemoved.getRemovedCandidate());
+        assertEquals("getRemainingCandidates", ImmutableSet.of(),
+                ImmutableSet.copyOf(candidateRemoved.getRemainingCandidates()));
+    }
+
+    private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
+        AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
+    }
+
+    private void deleteNode(YangInstanceIdentifier path) throws DataValidationFailedException {
+        AbstractEntityOwnershipTest.deleteNode(path, shardDataTree);
+    }
+}
index 3e9a51e..7e789ae 100644 (file)
@@ -213,6 +213,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh
     }
 
     private NormalizedNode<?, ?> readEntityOwners(ActorRef shard) throws Exception {
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
         Stopwatch sw = Stopwatch.createStarted();
         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
index 24913d2..e87b406 100644 (file)
@@ -25,8 +25,6 @@ import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 
 /**
@@ -94,13 +92,6 @@ public class EntityOwnerChangeListenerTest {
     }
 
     private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
-        DataTreeModification modification = shardDataTree.getDataTree().takeSnapshot().newModification();
-        modification.merge(path, node);
-        modification.ready();
-
-        shardDataTree.getDataTree().validate(modification);
-        DataTreeCandidateTip candidate = shardDataTree.getDataTree().prepare(modification);
-        shardDataTree.getDataTree().commit(candidate);
-        shardDataTree.notifyListeners(candidate);
+        AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
     }
 }
index d4c59cc..a6bf30c 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.dispatch.Dispatchers;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Function;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -95,6 +96,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         kit.expectMsgClass(SuccessReply.class);
 
         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+        verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
     }
 
     @Test
@@ -122,6 +125,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         peer.underlyingActor().grantVote = true;
 
         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+        verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
     }
 
     @Test
@@ -160,7 +165,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
         // write being applied to the state.
         follower.dropAppendEntries = false;
+
         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+        verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
     }
 
     @Test
@@ -197,6 +205,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         // Resume AppendEntries - the candidate write should now be committed.
         follower.dropAppendEntries = false;
         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+        verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
     }
 
     @Test
@@ -304,6 +314,20 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         return null;
     }
 
+    private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
+            String localMemberName) {
+        verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+            @Override
+            public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+                try {
+                    return AbstractShardTest.readStore(shard, path);
+                } catch(Exception e) {
+                    return null;
+                }
+            }
+        });
+    }
+
     private Props newShardProps() {
         return newShardProps(Collections.<String,String>emptyMap());
     }
index 468e2da..651d556 100644 (file)
@@ -63,6 +63,21 @@ public class CarsModel {
 
     }
 
+    public static NormalizedNode<?, ?> createEmptyCarsList(){
+
+        // Create a list builder
+        CollectionNodeBuilder<MapEntryNode, MapNode> cars =
+                ImmutableMapNodeBuilder.create().withNodeIdentifier(
+                        new YangInstanceIdentifier.NodeIdentifier(
+                                CAR_QNAME));
+
+        return ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))
+                .withChild(cars.build())
+                .build();
+
+    }
+
     public static NormalizedNode<?, ?> emptyContainer(){
         return ImmutableContainerNodeBuilder.create()
             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))