Bug 4105: Implement EntityOwnershipListener registration/notification
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
index 8ba17c0cc8a535ce530fa9a4bc9cd20f5876a980..146916a9621481846c05e832c29d440aa1b32b51 100644 (file)
@@ -7,9 +7,13 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -36,6 +40,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.hamcrest.Matcher;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
@@ -45,7 +50,9 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -65,6 +72,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -234,16 +242,17 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
         ShardTestKit kit = new ShardTestKit(getSystem());
 
-        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
                 shardBatchedModificationCount(5);
 
         String peerId = newShardId("leader").toString();
         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
 
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
-                ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
-                withDispatcher(Dispatchers.DefaultDispatcherId()));
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
+                TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME),
+                        ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build(),
+                        dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
 
         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
                 DataStoreVersions.CURRENT_VERSION), peer);
@@ -259,8 +268,6 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
                 LOCAL_MEMBER_NAME);
 
-        shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
-
         // Test with initial commit timeout and subsequent retry.
 
         leader.modificationsReceived = new CountDownLatch(1);
@@ -588,6 +595,90 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
     }
 
+    @Test
+    public void testListenerRegistration() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+        kit.waitUntilLeader(shard);
+        ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+        String otherEntityType = "otherEntityType";
+        Entity entity1 = new Entity(ENTITY_TYPE, ENTITY_ID1);
+        Entity entity2 = new Entity(ENTITY_TYPE, ENTITY_ID2);
+        Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3);
+        Entity entity4 = new Entity(otherEntityType, ENTITY_ID3);
+        EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        // Register listener
+
+        shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        // Register a couple candidates for the desired entity type and verify listener is notified.
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verify(listener, timeout(5000)).ownershipChanged(entity1, false, true);
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity2), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verify(listener, timeout(5000)).ownershipChanged(entity2, false, true);
+        reset(listener);
+
+        // Register another candidate for another entity type and verify listener is not notified.
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity4), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verify(listener, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+
+        // Register remote candidate for entity1
+
+        String remoteMemberName = "remoteMember";
+        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getId(), remoteMemberName),
+                shardDataTree);
+        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getId(), remoteMemberName);
+
+        // Unregister the local candidate for entity1 and verify listener is notified
+
+        shard.tell(new UnregisterCandidateLocal(candidate, entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verify(listener, timeout(5000)).ownershipChanged(entity1, true, false);
+        reset(listener);
+
+        // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
+
+        shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity3), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verifyOwner(shard, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME);
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verify(listener, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+        // Re-register the listener and verify it gets notified of current locally owned entities
+
+        reset(listener, candidate);
+
+        shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        Matcher<Entity> entityMatcher = either(equalTo(entity2)).or(equalTo(entity3));
+        verify(listener, timeout(5000).times(2)).ownershipChanged(argThat(entityMatcher), eq(false), eq(true));
+        Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+        verify(listener, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+        verify(listener, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
+        verify(candidate, never()).ownershipChanged(eq(entity2), anyBoolean(), anyBoolean());
+        verify(candidate, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean());
+    }
+
     private void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
             JavaTestKit sender) {
         BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
@@ -674,6 +765,23 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
                 type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
     }
 
+    public static class TestEntityOwnershipShard extends EntityOwnershipShard {
+
+        TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
+                DatastoreContext datastoreContext) {
+            super(name, peerAddresses, datastoreContext, SCHEMA_CONTEXT, LOCAL_MEMBER_NAME);
+        }
+
+        @Override
+        public void onReceiveCommand(Object message) throws Exception {
+            if(!(message instanceof ElectionTimeout)) {
+                super.onReceiveCommand(message);
+            }
+        }
+
+
+    }
+
     public static class MockFollower extends UntypedActor {
         volatile boolean grantVote;
         volatile boolean dropAppendEntries;