Refactor Register*ListenerReply classes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 8310e067c5fa002ed52b53e932b58e076aec4a76..ce2a43f015bbcb5ea67f791514f0eef02e7c19ae 100644 (file)
@@ -33,7 +33,6 @@ import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
-import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
@@ -60,37 +59,33 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -101,6 +96,8 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -109,6 +106,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -143,14 +141,14 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
-                        "testRegisterChangeListener-DataChangeListener");
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
+                        TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
 
                 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
                         AsyncDataBroker.DataChangeScope.BASE, true), getRef());
 
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
-                        RegisterChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 final String replyPath = reply.getListenerRegistrationPath().toString();
                 assertTrue("Incorrect reply path: " + replyPath,
                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
@@ -220,8 +218,9 @@ public class ShardTest extends AbstractShardTest {
 
         setupInMemorySnapshotStore();
 
+        final YangInstanceIdentifier path = TestModel.TEST_PATH;
         final MockDataChangeListener listener = new MockDataChangeListener(1);
-        final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+        final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
 
         final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -230,8 +229,6 @@ public class ShardTest extends AbstractShardTest {
 
         new ShardTestKit(getSystem()) {
             {
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
                 // Wait until the shard receives the first ElectionTimeout
                 // message.
                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
@@ -240,8 +237,8 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
                         getRef());
 
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 // Sanity check - verify the shard is not the leader yet.
@@ -275,13 +272,13 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
-                        "testRegisterDataTreeChangeListener-DataTreeChangeListener");
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 final String replyPath = reply.getListenerRegistrationPath().toString();
                 assertTrue("Incorrect reply path: " + replyPath,
                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
@@ -330,23 +327,22 @@ public class ShardTest extends AbstractShardTest {
 
         setupInMemorySnapshotStore();
 
+        final YangInstanceIdentifier path = TestModel.TEST_PATH;
         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-        final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+        final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
 
         final TestActorRef<Shard> shard = actorFactory.createTestActor(
                 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
 
-        final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
         new ShardTestKit(getSystem()) {
             {
                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
                 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
                 shard.tell(FindLeader.INSTANCE, getRef());
@@ -378,8 +374,9 @@ public class ShardTest extends AbstractShardTest {
                         CreateTransactionReply.class);
 
                 final String path = reply.getTransactionPath().toString();
-                assertTrue("Unexpected transaction path " + path, path
-                        .startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
+                assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+                        "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
+                            shardID.getShardName(), shardID.getMemberName().getName())));
             }
         };
     }
@@ -399,8 +396,9 @@ public class ShardTest extends AbstractShardTest {
                         CreateTransactionReply.class);
 
                 final String path = reply.getTransactionPath().toString();
-                assertTrue("Unexpected transaction path " + path, path.startsWith(
-                        "akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
+                assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+                        "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
+                        shardID.getShardName(), shardID.getMemberName().getName())));
             }
         };
     }
@@ -446,8 +444,9 @@ public class ShardTest extends AbstractShardTest {
         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
         final NormalizedNode<?,?> expected = readStore(store, root);
 
-        final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
+        final Snapshot snapshot = Snapshot.create(
+                new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4, -1, null, null);
 
         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
 
@@ -475,13 +474,6 @@ public class ShardTest extends AbstractShardTest {
 
         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         store.setSchemaContext(SCHEMA_CONTEXT);
-        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-        final NormalizedNode<?, ?> root = readStore(store, YangInstanceIdentifier.EMPTY);
-        final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
-
-        shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
 
         final DataTreeModification writeMod = store.takeSnapshot().newModification();
         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
@@ -489,10 +481,7 @@ public class ShardTest extends AbstractShardTest {
         writeMod.ready();
 
         final TransactionIdentifier tx = nextTransactionId();
-        final ApplyState applyState = new ApplyState(null, tx,
-                new ReplicatedLogImplEntry(1, 2, payloadForModification(store, writeMod, tx)));
-
-        shard.tell(applyState, shard);
+        shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
 
         final Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
@@ -519,7 +508,7 @@ public class ShardTest extends AbstractShardTest {
         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
 
         // Set up the InMemoryJournal.
-        InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1,
+        InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
             payloadForModification(source, writeMod, nextTransactionId())));
 
         final int nListEntries = 16;
@@ -536,7 +525,7 @@ public class ShardTest extends AbstractShardTest {
             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
             mod.ready();
 
-            InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
+            InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
                 payloadForModification(source, mod, nextTransactionId())));
         }
 
@@ -636,7 +625,6 @@ public class ShardTest extends AbstractShardTest {
                 final ReadyTransactionReply readyReply = ReadyTransactionReply
                         .fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
                 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
-
                 // Send the CanCommitTransaction message for the first Tx.
 
                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
@@ -691,18 +679,18 @@ public class ShardTest extends AbstractShardTest {
 
                 assertEquals("Commits complete", true, done);
 
-                final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
-                        cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
-                        cohort3.getPreCommit(), cohort3.getCommit());
-                inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
-                inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
-                inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
-                inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
-                inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
-                inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
-                inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
-                inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
-                inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
+//                final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
+//                        cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
+//                        cohort3.getPreCommit(), cohort3.getCommit());
+//                inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
+//                inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+//                inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
+//                inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+//                inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
+//                inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+//                inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
+//                inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
+//                inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
 
                 // Verify data in the data store.
 
@@ -1155,67 +1143,6 @@ public class ShardTest extends AbstractShardTest {
         };
     }
 
-    @Test
-    public void testReadWriteCommitWhenTransactionHasNoModifications() {
-        testCommitWhenTransactionHasNoModifications(true);
-    }
-
-    @Test
-    public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
-        testCommitWhenTransactionHasNoModifications(false);
-    }
-
-    private void testCommitWhenTransactionHasNoModifications(final boolean readWrite) {
-        // Note that persistence is enabled which would normally result in the
-        // entry getting written to the journal
-        // but here that need not happen
-        new ShardTestKit(getSystem()) {
-            {
-                final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasNoModifications-" + readWrite);
-
-                waitUntilLeader(shard);
-
-                final TransactionIdentifier transactionID = nextTransactionId();
-
-                final FiniteDuration duration = duration("5 seconds");
-
-                if (readWrite) {
-                    final ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore()
-                            .newReadWriteTransaction(transactionID);
-                    shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
-                } else {
-                    shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()),
-                            getRef());
-                }
-
-                expectMsgClass(duration, ReadyTransactionReply.class);
-
-                // Send the CanCommitTransaction message.
-
-                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-                final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
-                        .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
-                assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
-                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-                expectMsgClass(duration, CommitTransactionReply.class);
-
-                shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
-                final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
-
-                // Use MBean for verification
-                // Committed transaction count should increase as usual
-                assertEquals(1, shardStats.getCommittedTransactionsCount());
-
-                // Commit index should not advance because this does not go into
-                // the journal
-                assertEquals(-1, shardStats.getCommitIndex());
-            }
-        };
-    }
-
     @Test
     public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
         testCommitWhenTransactionHasModifications(true);
@@ -1345,8 +1272,12 @@ public class ShardTest extends AbstractShardTest {
                 final InOrder inOrder = inOrder(dataTree);
                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+
+                // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
+                //        validate performs wrapping and we capture that mock
+                // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+
                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
-                inOrder.verify(dataTree).validate(any(DataTreeModification.class));
             }
         };
     }
@@ -1517,14 +1448,15 @@ public class ShardTest extends AbstractShardTest {
             {
                 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
                     @Override
-                    void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
+                    void persistPayload(final Identifier id, final Payload payload,
+                            final boolean batchHint) {
                         // Simulate an AbortTransaction message occurring during
                         // replication, after
                         // persisting and before finishing the commit to the
                         // in-memory store.
 
-                        doAbortTransaction(transactionId, null);
-                        super.persistPayload(transactionId, payload);
+                        doAbortTransaction(id, null);
+                        super.persistPayload(id, payload, batchHint);
                     }
                 };
 
@@ -2045,8 +1977,8 @@ public class ShardTest extends AbstractShardTest {
 
             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot)
                     throws IOException {
-                final NormalizedNode<?, ?> actual = ShardDataTreeSnapshot.deserialize(snapshot.getState()).getRootNode()
-                        .get();
+                final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot()
+                        .getRootNode().get();
                 assertEquals("Root node", expectedRoot, actual);
             }
         };
@@ -2191,15 +2123,16 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
+                final YangInstanceIdentifier path = TestModel.TEST_PATH;
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         actorFactory.generateActorId(testName + "-DataChangeListener"));
 
                 setupInMemorySnapshotStore();
@@ -2210,12 +2143,10 @@ public class ShardTest extends AbstractShardTest {
 
                 waitUntilNoLeader(shard);
 
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
                         getRef());
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
@@ -2258,15 +2189,15 @@ public class ShardTest extends AbstractShardTest {
 
                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         actorFactory.generateActorId(testName + "-DataChangeListener"));
 
                 followerShard.tell(
                         new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
                         getRef());
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
-                assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
+                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -2276,16 +2207,16 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
-                        actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 setupInMemorySnapshotStore();
 
@@ -2296,8 +2227,8 @@ public class ShardTest extends AbstractShardTest {
                 waitUntilNoLeader(shard);
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
@@ -2308,6 +2239,43 @@ public class ShardTest extends AbstractShardTest {
         };
     }
 
+    @Test
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
+        new ShardTestKit(getSystem()) {
+            {
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
+                dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
+                        .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+                final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+                setupInMemorySnapshotStore();
+
+                final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                        newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        actorFactory.generateActorId(testName + "-shard"));
+
+                waitUntilNoLeader(shard);
+
+                shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
+                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+                final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
+                regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+                expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
+
+                shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+                        .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+                listener.expectNoMoreChanges("Received unexpected change after close");
+            }
+        };
+    }
+
     @Test
     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
@@ -2340,12 +2308,12 @@ public class ShardTest extends AbstractShardTest {
 
                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
                         actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));