Bug 4651: Implement handling of ClusteredDOMDataTreeChangeListener in CDS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index f097c19e512a3766a3d836c5cc7e061862b29666..6c517a464f5ed66b4304ed0d62fbbab7c35e9eff 100644 (file)
@@ -279,7 +279,7 @@ public class ShardTest extends AbstractShardTest {
             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
 
-            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
+            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterDataTreeChangeListenerReply.class);
@@ -347,7 +347,7 @@ public class ShardTest extends AbstractShardTest {
             assertEquals("Got first ElectionTimeout", true,
                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
-            shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
+            shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                 RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
@@ -2517,146 +2517,150 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+    public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            dataStoreContextBuilder.persistent(false);
-            final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
-            final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
-            final Creator<Shard> creator = new Creator<Shard>() {
-                private static final long serialVersionUID = 1L;
-                boolean firstElectionTimeout = true;
-
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(newShardBuilder()) {
-                        @Override
-                        public void onReceiveCommand(final Object message) throws Exception {
-                            if(message instanceof ElectionTimeout && firstElectionTimeout) {
-                                firstElectionTimeout = false;
-                                final ActorRef self = getSelf();
-                                new Thread() {
-                                    @Override
-                                    public void run() {
-                                        Uninterruptibles.awaitUninterruptibly(
-                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                        self.tell(message, self);
-                                    }
-                                }.start();
-
-                                onFirstElectionTimeout.countDown();
-                            } else {
-                                super.onReceiveCommand(message);
-                            }
-                        }
-                    };
-                }
-            };
+            String testName = "testClusteredDataChangeListenerDelayedRegistration";
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
 
             final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
-                "testDataChangeListenerOnFollower-DataChangeListener");
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                    actorFactory.generateActorId(testName + "-DataChangeListener"));
 
-            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
-                    withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    actorFactory.generateActorId(testName + "-shard"));
 
-            assertEquals("Got first ElectionTimeout", true,
-                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
-            shard.tell(new FindLeader(), getRef());
-            final FindLeaderReply findLeadeReply =
-                expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+            waitUntilNoLeader(shard);
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
 
             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                 RegisterChangeListenerReply.class);
-            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-            onChangeListenerRegistered.countDown();
+            shard.tell(new ElectionTimeout(), ActorRef.noSender());
 
             listener.waitForChangeEvents();
-
-            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
-    public void testClusteredDataChangeListernerRegistration() throws Exception {
-        dataStoreContextBuilder.persistent(false).build();
+    public void testClusteredDataChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
-                .shardName("inventory").type("config").build();
-
-            final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
-                .shardName("inventory").type("config").build();
-            final Creator<Shard> followerShardCreator = new Creator<Shard>() {
-                private static final long serialVersionUID = 1L;
+            String testName = "testClusteredDataChangeListenerRegistration";
+            final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+            final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+            final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+                    Shard.builder().id(followerShardID).
+                        datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+                        peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+                            "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+            final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+                    Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+                        peerAddresses(Collections.singletonMap(followerShardID.toString(),
+                            "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            String leaderPath = waitUntilLeader(followerShard);
+            assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
 
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()).
-                            peerAddresses(Collections.singletonMap(member2ShardID.toString(),
-                                    "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {
-                        @Override
-                        public void onReceiveCommand(final Object message) throws Exception {
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                    actorFactory.generateActorId(testName + "-DataChangeListener"));
 
-                            if(!(message instanceof ElectionTimeout)) {
-                                super.onReceiveCommand(message);
-                            }
-                        }
-                    };
-                }
-            };
+            followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
-            final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
-                private static final long serialVersionUID = 1L;
+            writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()).
-                            peerAddresses(Collections.singletonMap(member1ShardID.toString(),
-                                    "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {};
-                }
-            };
+            listener.waitForChangeEvents();
+        }};
+    }
 
+    @Test
+    public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
 
-            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(followerShardCreator)),
-                member1ShardID.toString());
+            final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                    actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
-            final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
-                member2ShardID.toString());
-            // Sleep to let election happen
-            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    actorFactory.generateActorId(testName + "-shard"));
 
-            shard.tell(new FindLeader(), getRef());
-            final FindLeaderReply findLeaderReply =
-                    expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+            waitUntilNoLeader(shard);
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
-            final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
-                "testDataChangeListenerOnFollower-DataChangeListener");
 
-            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
-            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                RegisterChangeListenerReply.class);
-            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+            final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                    RegisterDataTreeChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
+            shard.tell(new ElectionTimeout(), ActorRef.noSender());
+
             listener.waitForChangeEvents();
+        }};
+    }
 
-            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    @Test
+    public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataTreeChangeListenerRegistration";
+            final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+            final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+            final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+                    Shard.builder().id(followerShardID).
+                        datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+                        peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+                            "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+            final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+                    Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+                        peerAddresses(Collections.singletonMap(followerShardID.toString(),
+                            "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            String leaderPath = waitUntilLeader(followerShard);
+            assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                    actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+            followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+            final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                    RegisterDataTreeChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+            writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            listener.waitForChangeEvents();
         }};
     }