package org.opendaylight.controller.cluster.datastore.entityownership;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.dispatch.Dispatchers;
import akka.testkit.TestActorRef;
import com.google.common.base.Function;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
}
assertEquals("# modifications received", max, receivedMods.size());
}
- private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
- YangInstanceIdentifier entityId, String candidateName) throws Exception {
- verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
+ @Test
+ public void testOnUnregisterCandidateLocal() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+ kit.waitUntilLeader(shard);
+
+ Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ // Register
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+
+ // Unregister
+
+ reset(candidate);
+
+ shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Register again
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+ }
+
+ @Test
+ public void testOwnershipChanges() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+ kit.waitUntilLeader(shard);
+
+ Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+ ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+ // Add a remote candidate
+
+ String remoteMemberName1 = "remoteMember1";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+
+ // Register local
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Verify the remote candidate becomes owner
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Add another remote candidate and verify ownership doesn't change
+
+ reset(candidate);
+ String remoteMemberName2 = "remoteMember2";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Remove the second remote candidate and verify ownership doesn't change
+
+ reset(candidate);
+ deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+
+ verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Remove the first remote candidate and verify the local candidate becomes owner
+
+ reset(candidate);
+ deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+
+ verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+
+ // Add the second remote candidate back and verify ownership doesn't change
+
+ reset(candidate);
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Unregister the local candidate and verify the second remote candidate becomes owner
+
+ shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ }
+
+ private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) {
+ verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
+ new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+ @Override
+ public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+ try {
+ return AbstractShardTest.readStore(shard, path);
+ } catch(Exception e) {
+ throw new AssertionError("Failed to read " + path, e);
+ }
+ }
+ });
+ }
+
+ private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) {
+ verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+ @Override
+ public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+ try {
+ return AbstractShardTest.readStore(shard, path);
+ } catch(Exception e) {
+ throw new AssertionError("Failed to read " + path, e);
+ }
+ }
+ });
}
private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
entityId, candidateName);
}
- private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
- Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
- NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
- if(node != null) {
- return node;
- }
-
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
-
- return null;
- }
-
private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
String localMemberName) {
verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {