--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for candidate entries added/removed and notifies the EntityOwnershipShard appropriately.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
+public class CandidateListChangeListener implements DOMDataTreeChangeListener {
+ private static final Logger LOG = LoggerFactory.getLogger(CandidateListChangeListener.class);
+
+ private final ActorRef shard;
+ private final Map<YangInstanceIdentifier, Collection<String>> currentCandidates = new HashMap<>();
+
+ public CandidateListChangeListener(ActorRef shard, ShardDataTree shardDataTree) {
+ this.shard = Preconditions.checkNotNull(shard, "shard should not be null");
+
+ shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH).
+ node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME).
+ node(Candidate.QNAME).node(Candidate.QNAME).build(), this);
+ }
+
+ @Override
+ public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+ for(DataTreeCandidate change: changes) {
+ DataTreeCandidateNode changeRoot = change.getRootNode();
+
+ LOG.debug("Candidate node changed: {}, {}", changeRoot.getModificationType(), change.getRootPath());
+
+ NodeIdentifierWithPredicates candidateKey =
+ (NodeIdentifierWithPredicates) change.getRootPath().getLastPathArgument();
+ String candidate = candidateKey.getKeyValues().get(CANDIDATE_NAME_QNAME).toString();
+
+ YangInstanceIdentifier entityId = extractEntityPath(change.getRootPath());
+
+ if(changeRoot.getModificationType() == ModificationType.WRITE) {
+ LOG.debug("Candidate {} was added for entity {}", candidate, entityId);
+
+ Collection<String> currentCandidates = addToCurrentCandidates(entityId, candidate);
+ shard.tell(new CandidateAdded(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
+ } else if(changeRoot.getModificationType() == ModificationType.DELETE) {
+ LOG.debug("Candidate {} was removed for entity {}", candidate, entityId);
+
+ Collection<String> currentCandidates = removeFromCurrentCandidates(entityId, candidate);
+ shard.tell(new CandidateRemoved(entityId, candidate, new ArrayList<>(currentCandidates)), shard);
+ }
+ }
+ }
+
+ private Collection<String> addToCurrentCandidates(YangInstanceIdentifier entityId, String newCandidate) {
+ Collection<String> candidates = currentCandidates.get(entityId);
+ if(candidates == null) {
+ candidates = new LinkedHashSet<>();
+ currentCandidates.put(entityId, candidates);
+ }
+
+ candidates.add(newCandidate);
+ return candidates;
+ }
+
+ private Collection<String> removeFromCurrentCandidates(YangInstanceIdentifier entityId, String candidateToRemove) {
+ Collection<String> candidates = currentCandidates.get(entityId);
+ if(candidates != null) {
+ candidates.remove(candidateToRemove);
+ return candidates;
+ }
+
+ // Shouldn't happen
+ return Collections.emptyList();
+ }
+
+ private YangInstanceIdentifier extractEntityPath(YangInstanceIdentifier candidatePath) {
+ List<PathArgument> newPathArgs = new ArrayList<>();
+ for(PathArgument pathArg: candidatePath.getPathArguments()) {
+ newPathArgs.add(pathArg);
+ if(pathArg instanceof NodeIdentifierWithPredicates) {
+ NodeIdentifierWithPredicates nodeKey = (NodeIdentifierWithPredicates) pathArg;
+ Entry<QName, Object> key = nodeKey.getKeyValues().entrySet().iterator().next();
+ if(ENTITY_ID_QNAME.equals(key.getKey())) {
+ break;
+ }
+ }
+ }
+
+ return YangInstanceIdentifier.create(newPathArgs);
+ }
+}
\ No newline at end of file
import org.slf4j.LoggerFactory;
/**
- * A DataChangeListener that listeners for entity owner changes and notifies the EntityOwnershipListenerSupport
- * appropriately.
+ * Listens for entity owner changes and notifies the EntityOwnershipListenerSupport appropriately.
*
* @author Thomas Pantelis
*/
}
+ static YangInstanceIdentifier candidatePath(String entityType, YangInstanceIdentifier entityId,
+ String candidateName) {
+ return YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH).node(EntityType.QNAME).
+ nodeWithKey(EntityType.QNAME, ENTITY_TYPE_QNAME, entityType).node(ENTITY_QNAME).
+ nodeWithKey(ENTITY_QNAME, ENTITY_ID_QNAME, entityId).node(Candidate.QNAME).
+ nodeWithKey(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName).build();
+
+ }
+
static NormalizedNode<?, ?> entityOwnersWithCandidate(String entityType, YangInstanceIdentifier entityId,
String candidateName) {
return entityOwnersWithEntityTypeEntry(entityTypeEntryWithEntityEntry(entityType,
package org.opendaylight.controller.cluster.datastore.entityownership;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.pattern.Patterns;
+import com.google.common.base.Optional;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Future;
super.onDatastoreContext(noPersistenceDatastoreContext(context));
}
+ @Override
+ protected void onRecoveryComplete() {
+ super.onRecoveryComplete();
+
+ new CandidateListChangeListener(getSelf(), getDataStore());
+ }
+
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof RegisterCandidateLocal) {
onRegisterCandidateLocal((RegisterCandidateLocal)message);
} else if(message instanceof UnregisterCandidateLocal) {
onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
+ } else if(message instanceof CandidateAdded){
+ onCandidateAdded((CandidateAdded) message);
+ } else if(message instanceof CandidateRemoved){
+ onCandidateRemoved((CandidateRemoved) message);
} else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
+ private void onCandidateRemoved(CandidateRemoved message) {
+ if(!isLeader()){
+ return;
+ }
+
+ LOG.debug("onCandidateRemoved: {}", message);
+
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ if(message.getRemovedCandidate().equals(currentOwner)){
+ writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+ }
+ }
+
+ private void onCandidateAdded(CandidateAdded message) {
+ if(!isLeader()){
+ return;
+ }
+
+ LOG.debug("onCandidateAdded: {}", message);
+
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ if(currentOwner == null){
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
+ }
+ }
+
+ private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
+ LOG.debug("Writing new owner {} for entity {}", newOwner, entityPath);
+
+ commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
+ ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
+ }
+
+ private String newOwner(Collection<String> candidates) {
+ if(candidates.size() > 0){
+ return candidates.iterator().next();
+ }
+
+ return "";
+ }
+
+ private String getCurrentOwner(YangInstanceIdentifier entityId) {
+ DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+ Optional<NormalizedNode<?, ?>> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME));
+ if(optionalEntityOwner.isPresent()){
+ return optionalEntityOwner.get().getValue().toString();
+ }
+ return null;
+ }
+
public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Message sent when a new candidate is added for an entity.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
+public class CandidateAdded {
+ private final YangInstanceIdentifier entityPath;
+ private final Collection<String> allCandidates;
+ private final String newCandidate;
+
+ public CandidateAdded(YangInstanceIdentifier entityPath, String newCandidate, Collection<String> allCandidates) {
+ this.entityPath = entityPath;
+ this.newCandidate = newCandidate;
+ this.allCandidates = allCandidates;
+ }
+
+ public YangInstanceIdentifier getEntityPath() {
+ return entityPath;
+ }
+
+ public Collection<String> getAllCandidates() {
+ return allCandidates;
+ }
+
+ public String getNewCandidate() {
+ return newCandidate;
+ }
+
+ @Override
+ public String toString() {
+ return "CandidateAdded [entityPath=" + entityPath + ", newCandidate=" + newCandidate + ", allCandidates="
+ + allCandidates + "]";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Message sent when a candidate is removed for an entity.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
+public class CandidateRemoved {
+ private final YangInstanceIdentifier entityPath;
+ private final String removedCandidate;
+ private final Collection<String> remainingCandidates;
+
+ public CandidateRemoved(YangInstanceIdentifier entityPath, String removedCandidate, Collection<String> remainingCandidates) {
+ this.entityPath = entityPath;
+ this.removedCandidate = removedCandidate;
+ this.remainingCandidates = remainingCandidates;
+ }
+
+ public YangInstanceIdentifier getEntityPath() {
+ return entityPath;
+ }
+
+ public String getRemovedCandidate() {
+ return removedCandidate;
+ }
+
+ public Collection<String> getRemainingCandidates() {
+ return remainingCandidates;
+ }
+
+ @Override
+ public String toString() {
+ return "CandidateRemoved [entityPath=" + entityPath + ", removedCandidate=" + removedCandidate
+ + ", remainingCandidates=" + remainingCandidates + "]";
+ }
+}
import static org.junit.Assert.fail;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
/**
* Abstract base class providing utility methods.
getMapEntryNodeChild(entityEntry, Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName);
} catch(AssertionError e) {
- throw new AssertionError("Verification of enitity candidate failed - returned data was: " + node, e);
+ throw new AssertionError("Verification of entity candidate failed - returned data was: " + node, e);
}
}
}
return entityTypeEntry.get();
}
+
+ protected void verifyOwner(String expected, String entityType, YangInstanceIdentifier entityId,
+ Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) {
+ YangInstanceIdentifier entityPath = entityPath(entityType, entityId).node(ENTITY_OWNER_QNAME);
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+ NormalizedNode<?, ?> node = reader.apply(entityPath);
+ if(node != null) {
+ Assert.assertEquals("Entity owner", expected, node.getValue().toString());
+ return;
+ } else {
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ fail("Owner was not set for entityId: " + entityId);
+ }
+
+ static void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node, ShardDataTree shardDataTree)
+ throws DataValidationFailedException {
+ DataTreeModification modification = shardDataTree.getDataTree().takeSnapshot().newModification();
+ modification.merge(path, node);
+ commit(shardDataTree, modification);
+ }
+
+ static void deleteNode(YangInstanceIdentifier path, ShardDataTree shardDataTree)
+ throws DataValidationFailedException {
+ DataTreeModification modification = shardDataTree.getDataTree().takeSnapshot().newModification();
+ modification.delete(path);
+ commit(shardDataTree, modification);
+ }
+
+ static void commit(ShardDataTree shardDataTree, DataTreeModification modification)
+ throws DataValidationFailedException {
+ modification.ready();
+
+ shardDataTree.getDataTree().validate(modification);
+ DataTreeCandidateTip candidate = shardDataTree.getDataTree().prepare(modification);
+ shardDataTree.getDataTree().commit(candidate);
+ shardDataTree.notifyListeners(candidate);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableSet;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Unit tests for CandidateListChangeListener.
+ *
+ * @author Thomas Pantelis
+ */
+public class CandidateListChangeListenerTest extends AbstractActorTest {
+ private static final String ENTITY_TYPE = "test";
+ private static final YangInstanceIdentifier ENTITY_ID1 =
+ YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
+ private static final YangInstanceIdentifier ENTITY_ID2 =
+ YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
+
+ private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners());
+
+ @Test
+ public void testOnDataTreeChanged() throws Exception {
+ JavaTestKit kit = new JavaTestKit(getSystem());
+
+ CandidateListChangeListener listener = new CandidateListChangeListener(kit.getRef(), shardDataTree);
+
+ String memberName1 = "member-1";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName1));
+
+ CandidateAdded candidateAdded = kit.expectMsgClass(CandidateAdded.class);
+ assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID1), candidateAdded.getEntityPath());
+ assertEquals("getNewCandidate", memberName1, candidateAdded.getNewCandidate());
+ assertEquals("getAllCandidates", ImmutableSet.of(memberName1),
+ ImmutableSet.copyOf(candidateAdded.getAllCandidates()));
+
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName1));
+ kit.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
+
+ String memberName2 = "member-2";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, memberName2));
+
+ candidateAdded = kit.expectMsgClass(CandidateAdded.class);
+ assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID1), candidateAdded.getEntityPath());
+ assertEquals("getNewCandidate", memberName2, candidateAdded.getNewCandidate());
+ assertEquals("getAllCandidates", ImmutableSet.of(memberName1, memberName2),
+ ImmutableSet.copyOf(candidateAdded.getAllCandidates()));
+
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, memberName1));
+
+ candidateAdded = kit.expectMsgClass(CandidateAdded.class);
+ assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID2), candidateAdded.getEntityPath());
+ assertEquals("getNewCandidate", memberName1, candidateAdded.getNewCandidate());
+ assertEquals("getAllCandidates", ImmutableSet.of(memberName1),
+ ImmutableSet.copyOf(candidateAdded.getAllCandidates()));
+
+ deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, memberName1));
+
+ CandidateRemoved candidateRemoved = kit.expectMsgClass(CandidateRemoved.class);
+ assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID1), candidateRemoved.getEntityPath());
+ assertEquals("getRemovedCandidate", memberName1, candidateRemoved.getRemovedCandidate());
+ assertEquals("getRemainingCandidates", ImmutableSet.of(memberName2),
+ ImmutableSet.copyOf(candidateRemoved.getRemainingCandidates()));
+
+ deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, memberName2));
+
+ candidateRemoved = kit.expectMsgClass(CandidateRemoved.class);
+ assertEquals("getEntityId", entityPath(ENTITY_TYPE, ENTITY_ID1), candidateRemoved.getEntityPath());
+ assertEquals("getRemovedCandidate", memberName2, candidateRemoved.getRemovedCandidate());
+ assertEquals("getRemainingCandidates", ImmutableSet.of(),
+ ImmutableSet.copyOf(candidateRemoved.getRemainingCandidates()));
+ }
+
+ private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
+ AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
+ }
+
+ private void deleteNode(YangInstanceIdentifier path) throws DataValidationFailedException {
+ AbstractEntityOwnershipTest.deleteNode(path, shardDataTree);
+ }
+}
}
private NormalizedNode<?, ?> readEntityOwners(ActorRef shard) throws Exception {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
Stopwatch sw = Stopwatch.createStarted();
while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
/**
}
private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
- DataTreeModification modification = shardDataTree.getDataTree().takeSnapshot().newModification();
- modification.merge(path, node);
- modification.ready();
-
- shardDataTree.getDataTree().validate(modification);
- DataTreeCandidateTip candidate = shardDataTree.getDataTree().prepare(modification);
- shardDataTree.getDataTree().commit(candidate);
- shardDataTree.notifyListeners(candidate);
+ AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
}
}
import akka.actor.UntypedActor;
import akka.dispatch.Dispatchers;
import akka.testkit.TestActorRef;
+import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+ verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
}
@Test
peer.underlyingActor().grantVote = true;
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+ verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
}
@Test
// Resume AppendEntries - the follower should ack the commit which should then result in the candidate
// write being applied to the state.
follower.dropAppendEntries = false;
+
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+ verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
}
@Test
// Resume AppendEntries - the candidate write should now be committed.
follower.dropAppendEntries = false;
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+ verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
}
@Test
return null;
}
+ private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
+ String localMemberName) {
+ verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+ @Override
+ public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+ try {
+ return AbstractShardTest.readStore(shard, path);
+ } catch(Exception e) {
+ return null;
+ }
+ }
+ });
+ }
+
private Props newShardProps() {
return newShardProps(Collections.<String,String>emptyMap());
}
}
+ public static NormalizedNode<?, ?> createEmptyCarsList(){
+
+ // Create a list builder
+ CollectionNodeBuilder<MapEntryNode, MapNode> cars =
+ ImmutableMapNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(
+ CAR_QNAME));
+
+ return ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))
+ .withChild(cars.build())
+ .build();
+
+ }
+
public static NormalizedNode<?, ?> emptyContainer(){
return ImmutableContainerNodeBuilder.create()
.withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))