Remove unused exceptions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index d484f99696a6f89b7528eda2c9e64abc8c212eca..4dbdce0c079fbfa8e0b53b4304817184a73c2cfc 100644 (file)
@@ -34,10 +34,10 @@ import akka.util.Timeout;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -59,6 +59,8 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -68,17 +70,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
-import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
@@ -103,8 +102,6 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -112,10 +109,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
@@ -127,137 +123,6 @@ public class ShardTest extends AbstractShardTest {
     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
             + "InMemorySnapshotStore and journal recovery seq number will start from 1";
 
-    @Test
-    public void testRegisterChangeListener() throws Exception {
-        new ShardTestKit(getSystem()) {
-            {
-                final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testRegisterChangeListener");
-
-                waitUntilLeader(shard);
-
-                shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
-
-                final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
-                        TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
-
-                shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
-                        AsyncDataBroker.DataChangeScope.BASE, true), getRef());
-
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
-                        RegisterChangeListenerReply.class);
-                final String replyPath = reply.getListenerRegistrationPath().toString();
-                assertTrue("Incorrect reply path: " + replyPath,
-                        replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-                writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-                listener.waitForChangeEvents(path);
-            }
-        };
-    }
-
-    @SuppressWarnings("serial")
-    @Test
-    public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
-        // This test tests the timing window in which a change listener is registered before the
-        // shard becomes the leader. We verify that the listener is registered and notified of the
-        // existing data when the shard becomes the leader.
-        // For this test, we want to send the RegisterChangeListener message after the shard
-        // has recovered from persistence and before it becomes the leader. So we subclass
-        // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
-        // we know that the shard has been initialized to a follower and has started the
-        // election process. The following 2 CountDownLatches are used to coordinate the
-        // ElectionTimeout with the sending of the RegisterChangeListener message.
-        final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
-        final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
-        final Creator<Shard> creator = new Creator<Shard>() {
-            boolean firstElectionTimeout = true;
-
-            @Override
-            public Shard create() throws Exception {
-                // Use a non persistent provider because this test actually invokes persist on the journal
-                // this will cause all other messages to not be queued properly after that.
-                // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
-                // it does do a persist)
-                return new Shard(newShardBuilder()) {
-                    @Override
-                    public void handleCommand(final Object message) {
-                        if (message instanceof ElectionTimeout && firstElectionTimeout) {
-                            // Got the first ElectionTimeout. We don't forward it to the
-                            // base Shard yet until we've sent the RegisterChangeListener
-                            // message. So we signal the onFirstElectionTimeout latch to tell
-                            // the main thread to send the RegisterChangeListener message and
-                            // start a thread to wait on the onChangeListenerRegistered latch,
-                            // which the main thread signals after it has sent the message.
-                            // After the onChangeListenerRegistered is triggered, we send the
-                            // original ElectionTimeout message to proceed with the election.
-                            firstElectionTimeout = false;
-                            final ActorRef self = getSelf();
-                            new Thread() {
-                                @Override
-                                public void run() {
-                                    Uninterruptibles.awaitUninterruptibly(
-                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                    self.tell(message, self);
-                                }
-                            }.start();
-
-                            onFirstElectionTimeout.countDown();
-                        } else {
-                            super.handleCommand(message);
-                        }
-                    }
-                };
-            }
-        };
-
-        setupInMemorySnapshotStore();
-
-        final YangInstanceIdentifier path = TestModel.TEST_PATH;
-        final MockDataChangeListener listener = new MockDataChangeListener(1);
-        final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
-                "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
-
-        final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
-                "testRegisterChangeListenerWhenNotLeaderInitially");
-
-        new ShardTestKit(getSystem()) {
-            {
-                // Wait until the shard receives the first ElectionTimeout
-                // message.
-                assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
-                // Now send the RegisterChangeListener and wait for the reply.
-                shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
-                        getRef());
-
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
-                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
-                // Sanity check - verify the shard is not the leader yet.
-                shard.tell(FindLeader.INSTANCE, getRef());
-                final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-                assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
-
-                // Signal the onChangeListenerRegistered latch to tell the
-                // thread above to proceed
-                // with the election process.
-                onChangeListenerRegistered.countDown();
-
-                // Wait for the shard to become the leader and notify our
-                // listener with the existing
-                // data in the store.
-                listener.waitForChangeEvents(path);
-            }
-        };
-    }
-
     @Test
     public void testRegisterDataTreeChangeListener() throws Exception {
         new ShardTestKit(getSystem()) {
@@ -276,8 +141,8 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 final String replyPath = reply.getListenerRegistrationPath().toString();
                 assertTrue("Incorrect reply path: " + replyPath,
                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
@@ -299,21 +164,18 @@ public class ShardTest extends AbstractShardTest {
             boolean firstElectionTimeout = true;
 
             @Override
-            public Shard create() throws Exception {
+            public Shard create() {
                 return new Shard(newShardBuilder()) {
                     @Override
                     public void handleCommand(final Object message) {
                         if (message instanceof ElectionTimeout && firstElectionTimeout) {
                             firstElectionTimeout = false;
                             final ActorRef self = getSelf();
-                            new Thread() {
-                                @Override
-                                public void run() {
-                                    Uninterruptibles.awaitUninterruptibly(
-                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                    self.tell(message, self);
-                                }
-                            }.start();
+                            new Thread(() -> {
+                                Uninterruptibles.awaitUninterruptibly(
+                                        onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                self.tell(message, self);
+                            }).start();
 
                             onFirstElectionTimeout.countDown();
                         } else {
@@ -340,8 +202,8 @@ public class ShardTest extends AbstractShardTest {
                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
                 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
                 shard.tell(FindLeader.INSTANCE, getRef());
@@ -373,8 +235,9 @@ public class ShardTest extends AbstractShardTest {
                         CreateTransactionReply.class);
 
                 final String path = reply.getTransactionPath().toString();
-                assertTrue("Unexpected transaction path " + path, path
-                        .startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
+                assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+                        "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
+                            shardID.getShardName(), shardID.getMemberName().getName())));
             }
         };
     }
@@ -394,14 +257,15 @@ public class ShardTest extends AbstractShardTest {
                         CreateTransactionReply.class);
 
                 final String path = reply.getTransactionPath().toString();
-                assertTrue("Unexpected transaction path " + path, path.startsWith(
-                        "akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
+                assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
+                        "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
+                        shardID.getShardName(), shardID.getMemberName().getName())));
             }
         };
     }
 
     @Test
-    public void testPeerAddressResolved() throws Exception {
+    public void testPeerAddressResolved() {
         new ShardTestKit(getSystem()) {
             {
                 final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
@@ -428,8 +292,8 @@ public class ShardTest extends AbstractShardTest {
 
         ShardTestKit.waitUntilLeader(shard);
 
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        store.setSchemaContext(SCHEMA_CONTEXT);
+        final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
+            SCHEMA_CONTEXT);
 
         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
@@ -469,8 +333,8 @@ public class ShardTest extends AbstractShardTest {
 
         ShardTestKit.waitUntilLeader(shard);
 
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        store.setSchemaContext(SCHEMA_CONTEXT);
+        final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
+            SCHEMA_CONTEXT);
 
         final DataTreeModification writeMod = store.takeSnapshot().newModification();
         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
@@ -567,7 +431,7 @@ public class ShardTest extends AbstractShardTest {
                 }
             }
 
-            void onSuccess(final Object resp) throws Exception {
+            void onSuccess(final Object resp) {
             }
         }
 
@@ -592,7 +456,7 @@ public class ShardTest extends AbstractShardTest {
             }
 
             @Override
-            void onSuccess(final Object resp) throws Exception {
+            void onSuccess(final Object resp) {
                 final CanCommitTransactionReply canCommitReply =
                         CanCommitTransactionReply.fromSerializable(resp);
                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
@@ -669,9 +533,10 @@ public class ShardTest extends AbstractShardTest {
 
                 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
 
-                if (caughtEx.get() != null) {
-                    Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
-                    Throwables.propagate(caughtEx.get());
+                final Throwable t = caughtEx.get();
+                if (t != null) {
+                    Throwables.propagateIfPossible(t, Exception.class);
+                    throw new RuntimeException(t);
                 }
 
                 assertEquals("Commits complete", true, done);
@@ -693,13 +558,13 @@ public class ShardTest extends AbstractShardTest {
 
                 verifyOuterListEntry(shard, 1);
 
-                verifyLastApplied(shard, 2);
+                verifyLastApplied(shard, 5);
             }
         };
     }
 
     @Test
-    public void testBatchedModificationsWithNoCommitOnReady() throws Exception {
+    public void testBatchedModificationsWithNoCommitOnReady() {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -752,7 +617,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testBatchedModificationsWithCommitOnReady() throws Exception {
+    public void testBatchedModificationsWithCommitOnReady() {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -805,7 +670,7 @@ public class ShardTest extends AbstractShardTest {
                 final TransactionIdentifier transactionID = nextTransactionId();
                 final BatchedModifications batched = new BatchedModifications(transactionID,
                         DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
+                batched.setReady();
                 batched.setTotalMessagesSent(2);
 
                 shard.tell(batched, getRef());
@@ -813,15 +678,15 @@ public class ShardTest extends AbstractShardTest {
                 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
 
                 if (failure != null) {
-                    Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
-                    Throwables.propagate(failure.cause());
+                    Throwables.propagateIfPossible(failure.cause(), Exception.class);
+                    throw new RuntimeException(failure.cause());
                 }
             }
         };
     }
 
     @Test
-    public void testBatchedModificationsWithOperationFailure() throws Exception {
+    public void testBatchedModificationsWithOperationFailure() {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -848,7 +713,7 @@ public class ShardTest extends AbstractShardTest {
                 final Throwable cause = failure.cause();
 
                 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
+                batched.setReady();
                 batched.setTotalMessagesSent(2);
 
                 shard.tell(batched, getRef());
@@ -860,7 +725,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testBatchedModificationsOnTransactionChain() throws Exception {
+    public void testBatchedModificationsOnTransactionChain() {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -923,7 +788,7 @@ public class ShardTest extends AbstractShardTest {
                     private static final long serialVersionUID = 1L;
 
                     @Override
-                    public Shard create() throws Exception {
+                    public Shard create() {
                         return new Shard(newShardBuilder()) {
                             @Override
                             protected boolean isLeader() {
@@ -979,7 +844,8 @@ public class ShardTest extends AbstractShardTest {
                 failure = expectMsgClass(Failure.class);
                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
 
-                shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
+                shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
+                        getRef());
                 failure = expectMsgClass(Failure.class);
                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
             }
@@ -987,16 +853,16 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testReadyWithReadWriteImmediateCommit() throws Exception {
+    public void testReadyWithReadWriteImmediateCommit() {
         testReadyWithImmediateCommit(true);
     }
 
     @Test
-    public void testReadyWithWriteOnlyImmediateCommit() throws Exception {
+    public void testReadyWithWriteOnlyImmediateCommit() {
         testReadyWithImmediateCommit(false);
     }
 
-    private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception {
+    private void testReadyWithImmediateCommit(final boolean readWrite) {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -1024,7 +890,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testReadyLocalTransactionWithImmediateCommit() throws Exception {
+    public void testReadyLocalTransactionWithImmediateCommit() {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -1044,7 +910,8 @@ public class ShardTest extends AbstractShardTest {
 
                 final TransactionIdentifier txId = nextTransactionId();
                 modification.ready();
-                final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+                final ReadyLocalTransaction readyMessage =
+                        new ReadyLocalTransaction(txId, modification, true, Optional.empty());
 
                 shard.tell(readyMessage, getRef());
 
@@ -1057,7 +924,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception {
+    public void testReadyLocalTransactionWithThreePhaseCommit() {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -1077,7 +944,8 @@ public class ShardTest extends AbstractShardTest {
 
                 final TransactionIdentifier txId = nextTransactionId();
                 modification.ready();
-                final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+                final ReadyLocalTransaction readyMessage =
+                        new ReadyLocalTransaction(txId, modification, false, Optional.empty());
 
                 shard.tell(readyMessage, getRef());
 
@@ -1102,7 +970,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testReadWriteCommitWithPersistenceDisabled() throws Exception {
+    public void testReadWriteCommitWithPersistenceDisabled() {
         dataStoreContextBuilder.persistent(false);
         new ShardTestKit(getSystem()) {
             {
@@ -1153,7 +1021,7 @@ public class ShardTest extends AbstractShardTest {
     private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testCommitWhenTransactionHasModifications-" + readWrite);
@@ -1188,6 +1056,10 @@ public class ShardTest extends AbstractShardTest {
                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
 
+                // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
+                // the journal
+                Thread.sleep(HEARTBEAT_MILLIS * 2);
+
                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
 
@@ -1197,7 +1069,7 @@ public class ShardTest extends AbstractShardTest {
 
                 // Commit index should advance as we do not have an empty
                 // modification
-                assertEquals(0, shardStats.getCommitIndex());
+                assertEquals(1, shardStats.getCommitIndex());
             }
         };
     }
@@ -1206,7 +1078,7 @@ public class ShardTest extends AbstractShardTest {
     public void testCommitPhaseFailure() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testCommitPhaseFailure");
@@ -1283,7 +1155,7 @@ public class ShardTest extends AbstractShardTest {
     public void testPreCommitPhaseFailure() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testPreCommitPhaseFailure");
@@ -1351,7 +1223,7 @@ public class ShardTest extends AbstractShardTest {
     public void testCanCommitPhaseFailure() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testCanCommitPhaseFailure");
@@ -1398,7 +1270,7 @@ public class ShardTest extends AbstractShardTest {
     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+                final DataTree dataTree = createDelegatingMockDataTree();
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                         "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
@@ -1440,7 +1312,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testAbortWithCommitPending() throws Exception {
+    public void testAbortWithCommitPending() {
         new ShardTestKit(getSystem()) {
             {
                 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
@@ -1623,7 +1495,7 @@ public class ShardTest extends AbstractShardTest {
 //    }
 
     @Test
-    public void testTransactionCommitWithPriorExpiredCohortEntries() throws Exception {
+    public void testTransactionCommitWithPriorExpiredCohortEntries() {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
         new ShardTestKit(getSystem()) {
             {
@@ -1662,7 +1534,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Exception {
+    public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
         new ShardTestKit(getSystem()) {
             {
@@ -1701,7 +1573,7 @@ public class ShardTest extends AbstractShardTest {
                         .apply(modification3);
                 modification3.ready();
                 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
-                        true);
+                        true, Optional.empty());
                 shard.tell(readyMessage, getRef());
 
                 // Commit the first Tx. After completing, the second should
@@ -1722,7 +1594,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCanCommitBeforeReadyFailure() throws Exception {
+    public void testCanCommitBeforeReadyFailure() {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -1789,7 +1661,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testAbortAfterReady() throws Exception {
+    public void testAbortAfterReady() {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
         new ShardTestKit(getSystem()) {
             {
@@ -1834,7 +1706,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testAbortQueuedTransaction() throws Exception {
+    public void testAbortQueuedTransaction() {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -1961,7 +1833,7 @@ public class ShardTest extends AbstractShardTest {
             }
 
             private void awaitAndValidateSnapshot(final NormalizedNode<?, ?> expectedRoot)
-                    throws InterruptedException, IOException {
+                    throws InterruptedException {
                 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
                 assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
@@ -1972,8 +1844,7 @@ public class ShardTest extends AbstractShardTest {
                 savedSnapshot.set(null);
             }
 
-            private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot)
-                    throws IOException {
+            private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot) {
                 final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot()
                         .getRootNode().get();
                 assertEquals("Root node", expectedRoot, actual);
@@ -1985,9 +1856,9 @@ public class ShardTest extends AbstractShardTest {
      * This test simply verifies that the applySnapShot logic will work.
      */
     @Test
-    public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        store.setSchemaContext(SCHEMA_CONTEXT);
+    public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
+        final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
+            SCHEMA_CONTEXT);
 
         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
         putTransaction.write(TestModel.TEST_PATH,
@@ -2016,13 +1887,13 @@ public class ShardTest extends AbstractShardTest {
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         new ShardTestKit(getSystem()) {
             {
@@ -2065,7 +1936,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testRegisterRoleChangeListener() throws Exception {
+    public void testRegisterRoleChangeListener() {
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -2074,8 +1945,7 @@ public class ShardTest extends AbstractShardTest {
 
                 waitUntilLeader(shard);
 
-                final TestActorRef<MessageCollectorActor> listener =
-                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+                final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
 
                 shard.tell(new RegisterRoleChangeListener(), listener);
 
@@ -2103,7 +1973,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testFollowerInitialSyncStatus() throws Exception {
+    public void testFollowerInitialSyncStatus() {
         final TestActorRef<Shard> shard = actorFactory.createTestActor(
                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 "testFollowerInitialSyncStatus");
@@ -2120,17 +1990,16 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-                final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
-                        actorFactory.generateActorId(testName + "-DataChangeListener"));
+                final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 setupInMemorySnapshotStore();
 
@@ -2140,10 +2009,9 @@ public class ShardTest extends AbstractShardTest {
 
                 waitUntilNoLeader(shard);
 
-                shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
-                        getRef());
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
+                shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
@@ -2155,63 +2023,14 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataChangeListenerRegistration() throws Exception {
-        new ShardTestKit(getSystem()) {
-            {
-                final String testName = "testClusteredDataChangeListenerRegistration";
-                final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
-                        MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
-
-                final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
-                        MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
-
-                final TestActorRef<Shard> followerShard = actorFactory
-                        .createTestActor(Shard.builder().id(followerShardID)
-                                .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
-                                .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
-                                        "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
-                                .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
-
-                final TestActorRef<Shard> leaderShard = actorFactory
-                        .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
-                                .peerAddresses(Collections.singletonMap(followerShardID.toString(),
-                                        "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
-                                .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
-
-                leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
-                final String leaderPath = waitUntilLeader(followerShard);
-                assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
-
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-                final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
-                        actorFactory.generateActorId(testName + "-DataChangeListener"));
-
-                followerShard.tell(
-                        new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
-                        getRef());
-                final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterChangeListenerReply.class);
-                assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
-
-                writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-                listener.waitForChangeEvents();
-            }
-        };
-    }
-
-    @Test
-    public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
-                final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+                final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
                 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
                         TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
@@ -2224,14 +2043,18 @@ public class ShardTest extends AbstractShardTest {
                 waitUntilNoLeader(shard);
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
+                final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
+                regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+                expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
+
                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
                         .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
 
-                listener.waitForChangeEvents();
+                listener.expectNoMoreChanges("Received unexpected change after close");
             }
         };
     }
@@ -2252,14 +2075,14 @@ public class ShardTest extends AbstractShardTest {
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
@@ -2272,8 +2095,8 @@ public class ShardTest extends AbstractShardTest {
                         actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
-                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                        RegisterDataTreeChangeListenerReply.class);
+                final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeNotificationListenerReply.class);
                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -2284,8 +2107,9 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testServerRemoved() throws Exception {
-        final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
+    public void testServerRemoved() {
+        final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()));
 
         final ActorRef shard = parent.underlyingActor().context().actorOf(
                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),