@Override
public void close() {
- service.unregisterCandidate(getEntity());
+ service.unregisterCandidate(getEntity(), getInstance());
}
}
public void onComplete(Throwable failure, Object response) {
if(failure != null) {
LOG.debug("Error sending message {} to {}", message, shardActor, failure);
- // TODO - queue for retry
} else {
LOG.debug("{} message to {} succeeded", message, shardActor, failure);
}
return new DistributedEntityOwnershipCandidateRegistration(candidate, entity, this);
}
- void unregisterCandidate(Entity entity) {
+ void unregisterCandidate(Entity entity, EntityOwnershipCandidate entityOwnershipCandidate) {
LOG.debug("Unregistering candidate for {}", entity);
- executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entity));
+ executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entityOwnershipCandidate, entity));
registeredEntities.remove(entity);
}
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
for(DataTreeCandidate change: changes) {
- MapEntryNode entityNode = (MapEntryNode) change.getRootNode().getDataAfter().get();
+ DataTreeCandidateNode changeRoot = change.getRootNode();
+ MapEntryNode entityNode = (MapEntryNode) changeRoot.getDataAfter().get();
- LOG.debug("Entity node updated: {}", change.getRootPath());
+ LOG.debug("Entity node changed: {}, {}", changeRoot.getModificationType(), change.getRootPath());
String newOwner = extractOwner(entityNode);
String origOwner = null;
- Optional<NormalizedNode<?, ?>> dataBefore = change.getRootNode().getDataBefore();
+ Optional<NormalizedNode<?, ?>> dataBefore = changeRoot.getDataBefore();
if(dataBefore.isPresent()) {
- MapEntryNode origEntityNode = (MapEntryNode) change.getRootNode().getDataBefore().get();
+ MapEntryNode origEntityNode = (MapEntryNode) changeRoot.getDataBefore().get();
origOwner = extractOwner(origEntityNode);
}
LOG.debug("New owner: {}, Original owner: {}", newOwner, origOwner);
- boolean isOwner = Objects.equal(localMemberName, newOwner);
- boolean wasOwner = Objects.equal(localMemberName, origOwner);
- if(isOwner || wasOwner) {
- Entity entity = createEntity(change.getRootPath());
+ if(!Objects.equal(origOwner, newOwner)) {
+ boolean isOwner = Objects.equal(localMemberName, newOwner);
+ boolean wasOwner = Objects.equal(localMemberName, origOwner);
+ if(isOwner || wasOwner) {
+ Entity entity = createEntity(change.getRootPath());
- LOG.debug("Calling notifyEntityOwnershipListeners: entity: {}, wasOwner: {}, isOwner: {}",
- entity, wasOwner, isOwner);
+ LOG.debug("Calling notifyEntityOwnershipListeners: entity: {}, wasOwner: {}, isOwner: {}",
+ entity, wasOwner, isOwner);
- listenerSupport.notifyEntityOwnershipListeners(entity, wasOwner, isOwner);
+ listenerSupport.notifyEntityOwnershipListeners(entity, wasOwner, isOwner);
+ }
}
}
}
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.pattern.Patterns;
import com.google.common.base.Optional;
+import com.google.common.base.Strings;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
+ private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
+ LOG.debug("onUnregisterCandidateLocal: {}", unregisterCandidate);
+
+ Entity entity = unregisterCandidate.getEntity();
+ listenerSupport.removeEntityOwnershipListener(entity, unregisterCandidate.getCandidate());
+
+ YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
+ commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
void tryCommitModifications(final BatchedModifications modifications) {
if(isLeader()) {
LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
commitCoordinator.onStateChanged(this, isLeader());
}
- private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
- // TODO - implement
- getSender().tell(SuccessReply.INSTANCE, getSelf());
- }
-
private void onCandidateRemoved(CandidateRemoved message) {
if(!isLeader()){
return;
LOG.debug("onCandidateAdded: {}", message);
String currentOwner = getCurrentOwner(message.getEntityPath());
- if(currentOwner == null){
+ if(Strings.isNullOrEmpty(currentOwner)){
writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
}
}
package org.opendaylight.controller.cluster.datastore.entityownership.messages;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
/**
* Message sent to the local EntityOwnershipShard to unregister a candidate.
* @author Thomas Pantelis
*/
public class UnregisterCandidateLocal {
+ private final EntityOwnershipCandidate candidate;
private final Entity entity;
- public UnregisterCandidateLocal(Entity entity) {
+ public UnregisterCandidateLocal(EntityOwnershipCandidate candidate, Entity entity) {
+ this.candidate = candidate;
this.entity = entity;
}
+ public EntityOwnershipCandidate getCandidate() {
+ return candidate;
+ }
+
public Entity getEntity() {
return entity;
}
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("UnregisterCandidateLocal [entity=").append(entity).append("]");
- return builder.toString();
+ return "UnregisterCandidateLocal [entity=" + entity + ", candidate=" + candidate + "]";
}
}
import static org.junit.Assert.fail;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
}
}
+ protected void verifyEntityCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName,
+ Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) {
+ AssertionError lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+ NormalizedNode<?, ?> node = reader.apply(ENTITY_OWNERS_PATH);
+ try {
+ verifyEntityCandidate(node, entityType, entityId, candidateName);
+ return;
+ } catch (AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ throw lastError;
+ }
+
protected MapEntryNode getMapEntryNodeChild(DataContainerNode<? extends PathArgument> parent, QName childMap,
QName child, Object key) {
Optional<DataContainerChild<? extends PathArgument, ?>> childNode =
protected void verifyOwner(String expected, String entityType, YangInstanceIdentifier entityId,
Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) {
+ AssertionError lastError = null;
YangInstanceIdentifier entityPath = entityPath(entityType, entityId).node(ENTITY_OWNER_QNAME);
Stopwatch sw = Stopwatch.createStarted();
while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
- NormalizedNode<?, ?> node = reader.apply(entityPath);
- if(node != null) {
+ try {
+ NormalizedNode<?, ?> node = reader.apply(entityPath);
+ Assert.assertNotNull("Owner was not set for entityId: " + entityId, node);
Assert.assertEquals("Entity owner", expected, node.getValue().toString());
return;
- } else {
+ } catch(AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ throw lastError;
+ }
+
+ protected void verifyNodeRemoved(YangInstanceIdentifier path,
+ Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) {
+ AssertionError lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+ try {
+ NormalizedNode<?, ?> node = reader.apply(path);
+ Assert.assertNull("Node was not removed at path: " + path, node);
+ return;
+ } catch(AssertionError e) {
+ lastError = e;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
}
- fail("Owner was not set for entityId: " + entityId);
+ throw lastError;
}
static void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node, ShardDataTree shardDataTree)
UnregisterCandidateLocal unregCandidate = shardPropsCreator.waitForShardMessage();
assertEquals("getEntity", entity, unregCandidate.getEntity());
+ assertSame("getCandidate", candidate, unregCandidate.getCandidate());
// Re-register - should succeed.
}
@Test
- public void testOnDataChanged() throws Exception {
+ public void testOnDataTreeChanged() throws Exception {
writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME));
writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME));
- writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, REMOTE_MEMBER_NAME1));
-
verify(mockListenerSupport, never()).notifyEntityOwnershipListeners(any(Entity.class), anyBoolean(), anyBoolean());
writeNode(entityPath(ENTITY_TYPE, ENTITY_ID1), entityEntryWithOwner(ENTITY_ID1, LOCAL_MEMBER_NAME));
-
verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY1, false, true);
reset(mockListenerSupport);
- writeNode(entityPath(ENTITY_TYPE, ENTITY_ID1), entityEntryWithOwner(ENTITY_ID1, REMOTE_MEMBER_NAME1));
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, REMOTE_MEMBER_NAME1));
+ verify(mockListenerSupport, never()).notifyEntityOwnershipListeners(any(Entity.class), anyBoolean(), anyBoolean());
+ reset(mockListenerSupport);
+ writeNode(entityPath(ENTITY_TYPE, ENTITY_ID1), entityEntryWithOwner(ENTITY_ID1, REMOTE_MEMBER_NAME1));
verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY1, true, false);
reset(mockListenerSupport);
writeNode(entityPath(ENTITY_TYPE, ENTITY_ID1), entityEntryWithOwner(ENTITY_ID1, REMOTE_MEMBER_NAME2));
-
verify(mockListenerSupport, never()).notifyEntityOwnershipListeners(any(Entity.class), anyBoolean(), anyBoolean());
writeNode(entityPath(ENTITY_TYPE, ENTITY_ID1), entityEntryWithOwner(ENTITY_ID1, LOCAL_MEMBER_NAME));
-
verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY1, false, true);
reset(mockListenerSupport);
writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, REMOTE_MEMBER_NAME1));
-
verify(mockListenerSupport, never()).notifyEntityOwnershipListeners(any(Entity.class), anyBoolean(), anyBoolean());
reset(mockListenerSupport);
writeNode(entityPath(ENTITY_TYPE, ENTITY_ID2), entityEntryWithOwner(ENTITY_ID2, LOCAL_MEMBER_NAME));
-
verify(mockListenerSupport).notifyEntityOwnershipListeners(ENTITY2, false, true);
}
package org.opendaylight.controller.cluster.datastore.entityownership;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.dispatch.Dispatchers;
import akka.testkit.TestActorRef;
import com.google.common.base.Function;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
}
assertEquals("# modifications received", max, receivedMods.size());
}
- private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
- YangInstanceIdentifier entityId, String candidateName) throws Exception {
- verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
+ @Test
+ public void testOnUnregisterCandidateLocal() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+ kit.waitUntilLeader(shard);
+
+ Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ // Register
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+
+ // Unregister
+
+ reset(candidate);
+
+ shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Register again
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+ }
+
+ @Test
+ public void testOwnershipChanges() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+ kit.waitUntilLeader(shard);
+
+ Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+ ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+ // Add a remote candidate
+
+ String remoteMemberName1 = "remoteMember1";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+
+ // Register local
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Verify the remote candidate becomes owner
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Add another remote candidate and verify ownership doesn't change
+
+ reset(candidate);
+ String remoteMemberName2 = "remoteMember2";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Remove the second remote candidate and verify ownership doesn't change
+
+ reset(candidate);
+ deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+
+ verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Remove the first remote candidate and verify the local candidate becomes owner
+
+ reset(candidate);
+ deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+
+ verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+
+ // Add the second remote candidate back and verify ownership doesn't change
+
+ reset(candidate);
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Unregister the local candidate and verify the second remote candidate becomes owner
+
+ shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ }
+
+ private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) {
+ verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
+ new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+ @Override
+ public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+ try {
+ return AbstractShardTest.readStore(shard, path);
+ } catch(Exception e) {
+ throw new AssertionError("Failed to read " + path, e);
+ }
+ }
+ });
+ }
+
+ private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) {
+ verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+ @Override
+ public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+ try {
+ return AbstractShardTest.readStore(shard, path);
+ } catch(Exception e) {
+ throw new AssertionError("Failed to read " + path, e);
+ }
+ }
+ });
}
private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
entityId, candidateName);
}
- private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
- Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
- NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
- if(node != null) {
- return node;
- }
-
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
-
- return null;
- }
-
private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
String localMemberName) {
verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {