Adjust to yangtools-2.0.0 changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 6cfde54af73e9e147867bf4c4c9f41be6c2dd6c8..76745a442ca0e9d09100e447e7c6a25ce54d6162 100644 (file)
@@ -59,6 +59,8 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -69,9 +71,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -112,10 +113,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
@@ -146,8 +146,8 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
                         AsyncDataBroker.DataChangeScope.BASE, true), getRef());
 
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
-                        RegisterChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 final String replyPath = reply.getListenerRegistrationPath().toString();
                 assertTrue("Incorrect reply path: " + replyPath,
                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
@@ -197,14 +197,11 @@ public class ShardTest extends AbstractShardTest {
                             // original ElectionTimeout message to proceed with the election.
                             firstElectionTimeout = false;
                             final ActorRef self = getSelf();
-                            new Thread() {
-                                @Override
-                                public void run() {
-                                    Uninterruptibles.awaitUninterruptibly(
-                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                    self.tell(message, self);
-                                }
-                            }.start();
+                            new Thread(() -> {
+                                Uninterruptibles.awaitUninterruptibly(
+                                        onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                self.tell(message, self);
+                            }).start();
 
                             onFirstElectionTimeout.countDown();
                         } else {
@@ -236,8 +233,8 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
                         getRef());
 
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 // Sanity check - verify the shard is not the leader yet.
@@ -276,8 +273,8 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 final String replyPath = reply.getListenerRegistrationPath().toString();
                 assertTrue("Incorrect reply path: " + replyPath,
                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
@@ -306,14 +303,11 @@ public class ShardTest extends AbstractShardTest {
                         if (message instanceof ElectionTimeout && firstElectionTimeout) {
                             firstElectionTimeout = false;
                             final ActorRef self = getSelf();
-                            new Thread() {
-                                @Override
-                                public void run() {
-                                    Uninterruptibles.awaitUninterruptibly(
-                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                    self.tell(message, self);
-                                }
-                            }.start();
+                            new Thread(() -> {
+                                Uninterruptibles.awaitUninterruptibly(
+                                        onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                self.tell(message, self);
+                            }).start();
 
                             onFirstElectionTimeout.countDown();
                         } else {
@@ -340,8 +334,8 @@ public class ShardTest extends AbstractShardTest {
                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
                 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
                 shard.tell(FindLeader.INSTANCE, getRef());
@@ -373,8 +367,9 @@ public class ShardTest extends AbstractShardTest {
                         CreateTransactionReply.class);
 
                 final String path = reply.getTransactionPath().toString();
-                assertTrue("Unexpected transaction path " + path, path
-                        .startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
+                assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+                        "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
+                            shardID.getShardName(), shardID.getMemberName().getName())));
             }
         };
     }
@@ -394,8 +389,9 @@ public class ShardTest extends AbstractShardTest {
                         CreateTransactionReply.class);
 
                 final String path = reply.getTransactionPath().toString();
-                assertTrue("Unexpected transaction path " + path, path.startsWith(
-                        "akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
+                assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+                        "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
+                        shardID.getShardName(), shardID.getMemberName().getName())));
             }
         };
     }
@@ -428,8 +424,8 @@ public class ShardTest extends AbstractShardTest {
 
         ShardTestKit.waitUntilLeader(shard);
 
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        store.setSchemaContext(SCHEMA_CONTEXT);
+        final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
+            SCHEMA_CONTEXT);
 
         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
@@ -469,8 +465,8 @@ public class ShardTest extends AbstractShardTest {
 
         ShardTestKit.waitUntilLeader(shard);
 
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        store.setSchemaContext(SCHEMA_CONTEXT);
+        final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
+            SCHEMA_CONTEXT);
 
         final DataTreeModification writeMod = store.takeSnapshot().newModification();
         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
@@ -622,7 +618,6 @@ public class ShardTest extends AbstractShardTest {
                 final ReadyTransactionReply readyReply = ReadyTransactionReply
                         .fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
                 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
-
                 // Send the CanCommitTransaction message for the first Tx.
 
                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
@@ -670,9 +665,10 @@ public class ShardTest extends AbstractShardTest {
 
                 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
 
-                if (caughtEx.get() != null) {
-                    Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
-                    Throwables.propagate(caughtEx.get());
+                final Throwable t = caughtEx.get();
+                if (t != null) {
+                    Throwables.propagateIfPossible(t, Exception.class);
+                    throw new RuntimeException(t);
                 }
 
                 assertEquals("Commits complete", true, done);
@@ -694,7 +690,7 @@ public class ShardTest extends AbstractShardTest {
 
                 verifyOuterListEntry(shard, 1);
 
-                verifyLastApplied(shard, 2);
+                verifyLastApplied(shard, 5);
             }
         };
     }
@@ -814,8 +810,8 @@ public class ShardTest extends AbstractShardTest {
                 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
 
                 if (failure != null) {
-                    Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
-                    Throwables.propagate(failure.cause());
+                    Throwables.propagateIfPossible(failure.cause(), Exception.class);
+                    throw new RuntimeException(failure.cause());
                 }
             }
         };
@@ -1154,7 +1150,7 @@ public class ShardTest extends AbstractShardTest {
     private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testCommitWhenTransactionHasModifications-" + readWrite);
@@ -1189,6 +1185,10 @@ public class ShardTest extends AbstractShardTest {
                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
 
+                // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
+                // the journal
+                Thread.sleep(HEARTBEAT_MILLIS * 2);
+
                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
 
@@ -1198,7 +1198,7 @@ public class ShardTest extends AbstractShardTest {
 
                 // Commit index should advance as we do not have an empty
                 // modification
-                assertEquals(0, shardStats.getCommitIndex());
+                assertEquals(1, shardStats.getCommitIndex());
             }
         };
     }
@@ -1207,7 +1207,7 @@ public class ShardTest extends AbstractShardTest {
     public void testCommitPhaseFailure() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testCommitPhaseFailure");
@@ -1284,7 +1284,7 @@ public class ShardTest extends AbstractShardTest {
     public void testPreCommitPhaseFailure() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testPreCommitPhaseFailure");
@@ -1352,7 +1352,7 @@ public class ShardTest extends AbstractShardTest {
     public void testCanCommitPhaseFailure() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testCanCommitPhaseFailure");
@@ -1399,7 +1399,7 @@ public class ShardTest extends AbstractShardTest {
     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
@@ -1987,8 +1987,8 @@ public class ShardTest extends AbstractShardTest {
      */
     @Test
     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        store.setSchemaContext(SCHEMA_CONTEXT);
+        final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
+            SCHEMA_CONTEXT);
 
         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
         putTransaction.write(TestModel.TEST_PATH,
@@ -2017,13 +2017,13 @@ public class ShardTest extends AbstractShardTest {
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         new ShardTestKit(getSystem()) {
             {
@@ -2075,8 +2075,7 @@ public class ShardTest extends AbstractShardTest {
 
                 waitUntilLeader(shard);
 
-                final TestActorRef<MessageCollectorActor> listener =
-                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+                final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
 
                 shard.tell(new RegisterRoleChangeListener(), listener);
 
@@ -2121,10 +2120,10 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -2143,8 +2142,8 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
                         getRef());
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
@@ -2171,14 +2170,14 @@ public class ShardTest extends AbstractShardTest {
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
@@ -2193,9 +2192,9 @@ public class ShardTest extends AbstractShardTest {
                 followerShard.tell(
                         new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
                         getRef());
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
-                assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
+                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -2205,10 +2204,10 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -2225,8 +2224,8 @@ public class ShardTest extends AbstractShardTest {
                 waitUntilNoLeader(shard);
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
@@ -2237,6 +2236,43 @@ public class ShardTest extends AbstractShardTest {
         };
     }
 
+    @Test
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
+        new ShardTestKit(getSystem()) {
+            {
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
+                dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
+                        .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+                final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+                setupInMemorySnapshotStore();
+
+                final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                        newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        actorFactory.generateActorId(testName + "-shard"));
+
+                waitUntilNoLeader(shard);
+
+                shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
+                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+                final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
+                regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+                expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
+
+                shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+                        .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+                listener.expectNoMoreChanges("Received unexpected change after close");
+            }
+        };
+    }
+
     @Test
     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
@@ -2253,14 +2289,14 @@ public class ShardTest extends AbstractShardTest {
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
@@ -2273,8 +2309,8 @@ public class ShardTest extends AbstractShardTest {
                         actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -2286,7 +2322,8 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testServerRemoved() throws Exception {
-        final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
+        final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()));
 
         final ActorRef shard = parent.underlyingActor().context().actorOf(
                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),