Separate out RaftEntryMeta
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractDistributedDataStoreIntegrationTest.java
index 983dcda591d5cf15b6eb1859fbb188e3983c1d91..9f19ca045d6144639ecdcfb19da7301013036775 100644 (file)
@@ -13,10 +13,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 
@@ -24,6 +24,7 @@ import akka.actor.ActorSystem;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -37,17 +38,16 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runners.Parameterized.Parameter;
-import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
+import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
@@ -63,14 +63,12 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.Uint64;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
@@ -86,9 +84,8 @@ import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public abstract class AbstractDistributedDataStoreIntegrationTest {
-
     @Parameter
-    public Class<? extends AbstractDataStore> testParameter;
+    public Class<? extends ClientBackedDataStore> testParameter;
 
     protected ActorSystem system;
 
@@ -102,8 +99,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testWriteTransactionWithSingleShard() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "transactionIntegrationTest", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "transactionIntegrationTest", "test-1")) {
 
             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -118,8 +114,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testWriteTransactionWithMultipleShards() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testWriteTransactionWithMultipleShards",
+            "cars-1", "people-1")) {
 
             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -151,21 +147,16 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // Verify the data in the store
             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-            Optional<NormalizedNode> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", car, optional.get());
-
-            optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", person, optional.get());
+            assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
         }
     }
 
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithSingleShard",
+            "test-1")) {
 
             // 1. Create a read-write Tx
             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
@@ -180,9 +171,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
             assertEquals("exists", Boolean.TRUE, exists);
 
-            Optional<NormalizedNode> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", nodeToWrite, optional.get());
+            assertEquals(Optional.of(nodeToWrite), readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS));
 
             // 4. Ready the Tx for commit
             final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
@@ -193,17 +182,15 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // 6. Verify the data in the store
             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-            optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", nodeToWrite, optional.get());
+            assertEquals(Optional.of(nodeToWrite), readTx.read(nodePath).get(5, TimeUnit.SECONDS));
         }
     }
 
     @Test
     public void testReadWriteTransactionWithMultipleShards() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithMultipleShards",
+            "cars-1", "people-1")) {
 
             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
@@ -233,34 +220,27 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
             assertEquals("exists", Boolean.TRUE, exists);
 
-            Optional<NormalizedNode> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", car, optional.get());
+            assertEquals("Data node", Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
 
             testKit.doCommit(readWriteTx.ready());
 
             // Verify the data in the store
             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-            optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", car, optional.get());
-
-            optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", person, optional.get());
+            assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
         }
     }
 
     @Test
     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
-        final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
+        final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+        try (var dataStore = testKit.setupDataStore(testParameter, "testSingleTransactionsWritesInQuickSuccession",
+            "cars-1")) {
 
-            final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+            final var txChain = dataStore.createTransactionChain();
 
-            DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            var writeTx = txChain.newWriteOnlyTransaction();
             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
             testKit.doCommit(writeTx.ready());
@@ -283,16 +263,16 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 .untilAsserted(() -> {
                     // verify frontend metadata has no holes in purged transactions causing overtime memory leak
                     final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
-                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
-                        (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+                    final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
                             .executeOperation(localShard, new RequestFrontendMetadata());
 
                     final var clientMeta = frontendMetadata.getClients().get(0);
-                    if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                        assertTellMetadata(clientMeta);
-                    } else {
-                        assertAskMetadata(clientMeta);
+                    final var iterator = clientMeta.getCurrentHistories().iterator();
+                    var metadata = iterator.next();
+                    while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                        metadata = iterator.next();
                     }
+                    assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
                 });
 
             final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
@@ -304,20 +284,6 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         }
     }
 
-    private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
-        // ask based should track no metadata
-        assertEquals(List.of(), clientMeta.getCurrentHistories());
-    }
-
-    private static void assertTellMetadata(final FrontendClientMetadata clientMeta) {
-        final var iterator = clientMeta.getCurrentHistories().iterator();
-        var metadata = iterator.next();
-        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
-            metadata = iterator.next();
-        }
-        assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
-    }
-
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
             throws Exception {
@@ -334,8 +300,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
 
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
-
+        try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
             final Object result = dataStore.getActorUtils().executeOperation(
                 dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true));
             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
@@ -380,18 +345,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 // leader was elected in time, the Tx
                 // should have timed out and throw an appropriate
                 // exception cause.
-                try {
-                    txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
-                    fail("Expected NoShardLeaderException");
-                } catch (final ExecutionException e) {
-                    final String msg = "Unexpected exception: "
-                            + Throwables.getStackTraceAsString(e.getCause());
-                    if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
-                        assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
-                    } else {
-                        assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
-                    }
-                }
+                final var ex = assertThrows(ExecutionException.class,
+                    () -> txCohort.get().canCommit().get(10, TimeUnit.SECONDS));
+                assertTrue("Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()),
+                    Throwables.getRootCause(ex) instanceof RequestTimeoutException);
             } finally {
                 try {
                     if (writeTxToClose != null) {
@@ -419,8 +376,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testTransactionAbort() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "transactionAbortIntegrationTest", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "transactionAbortIntegrationTest", "test-1")) {
 
             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -442,8 +398,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @SuppressWarnings("checkstyle:IllegalCatch")
     public void testTransactionChainWithSingleShard() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testTransactionChainWithSingleShard", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithSingleShard", "test-1")) {
 
             // 1. Create a Tx chain and write-only Tx
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@@ -478,9 +433,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // the data from the first
             // Tx is visible after being readied.
             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
-            Optional<NormalizedNode> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", testNode, optional.get());
+            assertEquals(Optional.of(testNode), readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
 
             // 6. Create a new RW Tx from the chain, write more data,
             // and ready it
@@ -496,9 +449,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // from the last RW Tx to
             // verify it is visible.
             readTx = txChain.newReadWriteTransaction();
-            optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", outerNode, optional.get());
+            assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
 
             // 8. Wait for the 2 commits to complete and close the
             // chain.
@@ -516,17 +467,15 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // 9. Create a new read Tx from the data store and verify
             // committed data.
             readTx = dataStore.newReadOnlyTransaction();
-            optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", outerNode, optional.get());
+            assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
         }
     }
 
     @Test
     public void testTransactionChainWithMultipleShards() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithMultipleShards",
+            "cars-1", "people-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
 
@@ -551,13 +500,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
             readWriteTx.merge(personPath, person);
 
-            Optional<NormalizedNode> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", car, optional.get());
-
-            optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", person, optional.get());
+            assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS));
 
             final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
 
@@ -578,28 +522,23 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-            optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertFalse("isPresent", optional.isPresent());
-
-            optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", person, optional.get());
+            assertEquals(Optional.empty(), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
         }
     }
 
     @Test
     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionsInQuickSuccession",
+            "cars-1")) {
 
             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
                 MoreExecutors.directExecutor());
 
-            final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
-            DOMTransactionChain txChain = broker.createTransactionChain(listener);
+            DOMTransactionChain txChain = broker.createTransactionChain();
 
             final List<ListenableFuture<?>> futures = new ArrayList<>();
 
@@ -625,7 +564,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             final Optional<NormalizedNode> optional = txChain.newReadOnlyTransaction()
                     .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
             assertTrue("isPresent", optional.isPresent());
-            assertEquals("# cars", numCars, ((Collection<?>) optional.get().body()).size());
+            assertEquals("# cars", numCars, ((Collection<?>) optional.orElseThrow().body()).size());
 
             txChain.close();
 
@@ -636,8 +575,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterEmptyTxReadied",
+            "test-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
 
@@ -657,8 +596,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionWhenPreviousNotReady",
+            "test-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
 
@@ -677,8 +616,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testCreateChainedTransactionAfterClose() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterClose",
+            "test-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
             txChain.close();
@@ -692,8 +631,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testChainWithReadOnlyTxAfterPreviousReady",
+            "test-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
 
@@ -734,39 +673,34 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
     @Test
     public void testChainedTransactionFailureWithSingleShard() throws Exception {
-        final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
+        final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+        try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithSingleShard",
+            "cars-1")) {
 
-            final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+            final var broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
                 MoreExecutors.directExecutor());
 
-            final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
-            final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+            final var listener = mock(FutureCallback.class);
+            final var txChain = broker.createTransactionChain();
+            txChain.addCallback(listener);
 
-            final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
+            final var writeTx = txChain.newReadWriteTransaction();
 
             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
                 PeopleModel.emptyContainer());
 
-            final ContainerNode invalidData = Builders.containerBuilder()
+            final var invalidData = Builders.containerBuilder()
                     .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
                     .build();
 
             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
-            try {
-                writeTx.commit().get(5, TimeUnit.SECONDS);
-                fail("Expected TransactionCommitFailedException");
-            } catch (final ExecutionException e) {
-                // Expected
-            }
+            assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
 
-            verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
-                any(Throwable.class));
+            verify(listener, timeout(5000)).onFailure(any());
 
             txChain.close();
             broker.close();
@@ -776,16 +710,17 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithMultipleShards",
+            "cars-1", "people-1")) {
 
             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
                 MoreExecutors.directExecutor());
 
-            final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
-            final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+            final var listener = mock(FutureCallback.class);
+            final DOMTransactionChain txChain = broker.createTransactionChain();
+            txChain.addCallback(listener);
 
             final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
 
@@ -802,15 +737,9 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // Note that merge will validate the data and fail but put
             // succeeds b/c deep validation is not
             // done for put for performance reasons.
-            try {
-                writeTx.commit().get(5, TimeUnit.SECONDS);
-                fail("Expected TransactionCommitFailedException");
-            } catch (final ExecutionException e) {
-                // Expected
-            }
+            assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
 
-            verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
-                any(Throwable.class));
+            verify(listener, timeout(5000)).onFailure(any());
 
             txChain.close();
             broker.close();
@@ -820,16 +749,15 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testDataTreeChangeListenerRegistration() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testDataTreeChangeListenerRegistration",
+            "test-1")) {
 
             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
 
-            ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
-                    .registerTreeChangeListener(TestModel.TEST_PATH, listener);
+            final var listenerReg = dataStore.registerTreeChangeListener(TestModel.TEST_PATH, listener);
 
             assertNotNull("registerTreeChangeListener returned null", listenerReg);
 
@@ -881,7 +809,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         DataTree dataTree = new InMemoryDataTreeFactory().create(
             DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
         AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
-        NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
+        NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of());
 
         final Snapshot carsSnapshot = Snapshot.create(
             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
@@ -893,7 +821,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         final NormalizedNode peopleNode = PeopleModel.create();
         AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
 
-        root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
+        root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of());
 
         final Snapshot peopleSnapshot = Snapshot.create(
             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
@@ -903,41 +831,31 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
             new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
 
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, name, "module-shards-member1.conf", true,
+            "cars", "people")) {
 
             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
             // two reads
-            Optional<NormalizedNode> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", carsNode, optional.get());
-
-            optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", peopleNode, optional.get());
+            assertEquals(Optional.of(carsNode), readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(peopleNode), readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS));
         }
     }
 
     @Test
+    @Ignore("ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate")
+    // FIXME: re-enable this test
     public void testSnapshotOnRootOverwrite() throws Exception {
-        if (!DistributedDataStore.class.isAssignableFrom(testParameter)) {
-            // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
-            return;
-        }
-
-        final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(),
-                datastoreContextBuilder.snapshotOnRootOverwrite(true));
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-                testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf",
-                true, "cars", "default")) {
+        final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder.snapshotOnRootOverwrite(true));
+        try (var dataStore = testKit.setupDataStore(testParameter, "testRootOverwrite",
+            "module-shards-default-cars-member1.conf", true, "cars", "default")) {
 
-            ContainerNode rootNode = Builders.containerBuilder()
+            final var rootNode = Builders.containerBuilder()
                 .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))
                 .withChild(CarsModel.create())
                 .build();
 
-            testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
+            testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode);
             IntegrationTestKit.verifyShardState(dataStore, "cars",
                 state -> assertEquals(1, state.getSnapshotIndex()));
 
@@ -957,7 +875,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
 
             // root overwrite so expect a snapshot
-            testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
+            testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode);
 
             // this was a real snapshot so everything should be in it(1 + 10 + 1)
             IntegrationTestKit.verifyShardState(dataStore, "cars",