Remove DataChangeListener and friends
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 76745a442ca0e9d09100e447e7c6a25ce54d6162..cca0bffaa806b444ff2a3befa19c005e6da378a7 100644 (file)
@@ -70,7 +70,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
@@ -79,7 +78,6 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
-import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
@@ -104,7 +102,6 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -127,134 +124,6 @@ public class ShardTest extends AbstractShardTest {
     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
             + "InMemorySnapshotStore and journal recovery seq number will start from 1";
 
-    @Test
-    public void testRegisterChangeListener() throws Exception {
-        new ShardTestKit(getSystem()) {
-            {
-                final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testRegisterChangeListener");
-
-                waitUntilLeader(shard);
-
-                shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
-
-                final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
-                        TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
-
-                shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
-                        AsyncDataBroker.DataChangeScope.BASE, true), getRef());
-
-                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
-                        RegisterDataTreeNotificationListenerReply.class);
-                final String replyPath = reply.getListenerRegistrationPath().toString();
-                assertTrue("Incorrect reply path: " + replyPath,
-                        replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-                writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-                listener.waitForChangeEvents(path);
-            }
-        };
-    }
-
-    @SuppressWarnings("serial")
-    @Test
-    public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
-        // This test tests the timing window in which a change listener is registered before the
-        // shard becomes the leader. We verify that the listener is registered and notified of the
-        // existing data when the shard becomes the leader.
-        // For this test, we want to send the RegisterChangeListener message after the shard
-        // has recovered from persistence and before it becomes the leader. So we subclass
-        // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
-        // we know that the shard has been initialized to a follower and has started the
-        // election process. The following 2 CountDownLatches are used to coordinate the
-        // ElectionTimeout with the sending of the RegisterChangeListener message.
-        final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
-        final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
-        final Creator<Shard> creator = new Creator<Shard>() {
-            boolean firstElectionTimeout = true;
-
-            @Override
-            public Shard create() throws Exception {
-                // Use a non persistent provider because this test actually invokes persist on the journal
-                // this will cause all other messages to not be queued properly after that.
-                // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
-                // it does do a persist)
-                return new Shard(newShardBuilder()) {
-                    @Override
-                    public void handleCommand(final Object message) {
-                        if (message instanceof ElectionTimeout && firstElectionTimeout) {
-                            // Got the first ElectionTimeout. We don't forward it to the
-                            // base Shard yet until we've sent the RegisterChangeListener
-                            // message. So we signal the onFirstElectionTimeout latch to tell
-                            // the main thread to send the RegisterChangeListener message and
-                            // start a thread to wait on the onChangeListenerRegistered latch,
-                            // which the main thread signals after it has sent the message.
-                            // After the onChangeListenerRegistered is triggered, we send the
-                            // original ElectionTimeout message to proceed with the election.
-                            firstElectionTimeout = false;
-                            final ActorRef self = getSelf();
-                            new Thread(() -> {
-                                Uninterruptibles.awaitUninterruptibly(
-                                        onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                self.tell(message, self);
-                            }).start();
-
-                            onFirstElectionTimeout.countDown();
-                        } else {
-                            super.handleCommand(message);
-                        }
-                    }
-                };
-            }
-        };
-
-        setupInMemorySnapshotStore();
-
-        final YangInstanceIdentifier path = TestModel.TEST_PATH;
-        final MockDataChangeListener listener = new MockDataChangeListener(1);
-        final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
-                "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
-
-        final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
-                "testRegisterChangeListenerWhenNotLeaderInitially");
-
-        new ShardTestKit(getSystem()) {
-            {
-                // Wait until the shard receives the first ElectionTimeout
-                // message.
-                assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
-                // Now send the RegisterChangeListener and wait for the reply.
-                shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
-                        getRef());
-
-                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeNotificationListenerReply.class);
-                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
-                // Sanity check - verify the shard is not the leader yet.
-                shard.tell(FindLeader.INSTANCE, getRef());
-                final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-                assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
-
-                // Signal the onChangeListenerRegistered latch to tell the
-                // thread above to proceed
-                // with the election process.
-                onChangeListenerRegistered.countDown();
-
-                // Wait for the shard to become the leader and notify our
-                // listener with the existing
-                // data in the store.
-                listener.waitForChangeEvents(path);
-            }
-        };
-    }
-
     @Test
     public void testRegisterDataTreeChangeListener() throws Exception {
         new ShardTestKit(getSystem()) {
@@ -2119,90 +1988,6 @@ public class ShardTest extends AbstractShardTest {
         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
     }
 
-    @Test
-    public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
-        new ShardTestKit(getSystem()) {
-            {
-                final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
-                dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
-                        .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
-
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-                final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
-                        actorFactory.generateActorId(testName + "-DataChangeListener"));
-
-                setupInMemorySnapshotStore();
-
-                final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                        newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        actorFactory.generateActorId(testName + "-shard"));
-
-                waitUntilNoLeader(shard);
-
-                shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
-                        getRef());
-                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeNotificationListenerReply.class);
-                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
-                shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
-                        .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
-
-                listener.waitForChangeEvents();
-            }
-        };
-    }
-
-    @Test
-    public void testClusteredDataChangeListenerRegistration() throws Exception {
-        new ShardTestKit(getSystem()) {
-            {
-                final String testName = "testClusteredDataChangeListenerRegistration";
-                final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
-                        MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
-
-                final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
-                        MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
-
-                final TestActorRef<Shard> followerShard = actorFactory
-                        .createTestActor(Shard.builder().id(followerShardID)
-                                .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
-                                .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
-                                        "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
-                                .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
-
-                final TestActorRef<Shard> leaderShard = actorFactory
-                        .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
-                                .peerAddresses(Collections.singletonMap(followerShardID.toString(),
-                                        "akka://test/user/" + followerShardID.toString()))
-                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
-                                .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
-
-                leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
-                final String leaderPath = waitUntilLeader(followerShard);
-                assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
-
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-                final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
-                        actorFactory.generateActorId(testName + "-DataChangeListener"));
-
-                followerShard.tell(
-                        new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
-                        getRef());
-                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeNotificationListenerReply.class);
-                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
-                writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-                listener.waitForChangeEvents();
-            }
-        };
-    }
-
     @Test
     public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {