Do not special-case AbstractDistributedDataStoreIntegrationTest
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractDistributedDataStoreIntegrationTest.java
index 62986b2ebfe86f57a378d973e1be36e01eb27c29..3d00a2592b0a842b194f8e43a27a83b5d9de4bc3 100644 (file)
@@ -7,12 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.runners.Parameterized.Parameter;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.timeout;
@@ -25,7 +27,6 @@ import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -36,18 +37,23 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -65,16 +71,19 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.Uint64;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public abstract class AbstractDistributedDataStoreIntegrationTest {
 
@@ -129,7 +138,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             writeTx = dataStore.newWriteOnlyTransaction();
 
-            final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
             writeTx.write(carPath, car);
 
@@ -142,13 +151,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // Verify the data in the store
             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-            Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", car, optional.get());
-
-            optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", person, optional.get());
+            assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
         }
     }
 
@@ -164,16 +168,14 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             // 2. Write some data
             final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
-            final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             readWriteTx.write(nodePath, nodeToWrite);
 
             // 3. Read the data from Tx
             final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
             assertEquals("exists", Boolean.TRUE, exists);
 
-            Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", nodeToWrite, optional.get());
+            assertEquals(Optional.of(nodeToWrite), readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS));
 
             // 4. Ready the Tx for commit
             final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
@@ -184,9 +186,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // 6. Verify the data in the store
             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-            optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", nodeToWrite, optional.get());
+            assertEquals(Optional.of(nodeToWrite), readTx.read(nodePath).get(5, TimeUnit.SECONDS));
         }
     }
 
@@ -213,7 +213,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             readWriteTx = dataStore.newReadWriteTransaction();
 
-            final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
             readWriteTx.write(carPath, car);
 
@@ -224,22 +224,15 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
             assertEquals("exists", Boolean.TRUE, exists);
 
-            Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", car, optional.get());
+            assertEquals("Data node", Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
 
             testKit.doCommit(readWriteTx.ready());
 
             // Verify the data in the store
             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-            optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", car, optional.get());
-
-            optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", person, optional.get());
+            assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
         }
     }
 
@@ -256,21 +249,57 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
             testKit.doCommit(writeTx.ready());
 
-            writeTx = txChain.newWriteOnlyTransaction();
-
             int numCars = 5;
             for (int i = 0; i < numCars; i++) {
-                writeTx.write(CarsModel.newCarPath("car" + i),
-                    CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+                writeTx = txChain.newWriteOnlyTransaction();
+                writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
+
+                testKit.doCommit(writeTx.ready());
+
+                try (var tx = txChain.newReadOnlyTransaction()) {
+                    tx.read(CarsModel.BASE_PATH).get();
+                }
             }
 
-            testKit.doCommit(writeTx.ready());
+            // wait to let the shard catch up with purged
+            await("transaction state propagation").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    // verify frontend metadata has no holes in purged transactions causing overtime memory leak
+                    final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                        (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+                            .executeOperation(localShard, new RequestFrontendMetadata());
+
+                    final var clientMeta = frontendMetadata.getClients().get(0);
+                    if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        assertTellMetadata(clientMeta);
+                    } else {
+                        assertAskMetadata(clientMeta);
+                    }
+                });
 
-            final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
-                    .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+            final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
+                .get(5, TimeUnit.SECONDS)
+                .orElseThrow()
+                .body();
+            assertThat(body, instanceOf(Collection.class));
+            assertEquals("# cars", numCars, ((Collection<?>) body).size());
+        }
+    }
+
+    private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
+        // ask based should track no metadata
+        assertEquals(List.of(), clientMeta.getCurrentHistories());
+    }
+
+    private static void assertTellMetadata(final FrontendClientMetadata clientMeta) {
+        final var iterator = clientMeta.getCurrentHistories().iterator();
+        var metadata = iterator.next();
+        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+            metadata = iterator.next();
         }
+        assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
@@ -335,18 +364,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 // leader was elected in time, the Tx
                 // should have timed out and throw an appropriate
                 // exception cause.
-                try {
-                    txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
-                    fail("Expected NoShardLeaderException");
-                } catch (final ExecutionException e) {
-                    final String msg = "Unexpected exception: "
-                            + Throwables.getStackTraceAsString(e.getCause());
-                    if (DistributedDataStore.class.equals(testParameter)) {
-                        assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
-                    } else {
-                        assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
-                    }
-                }
+                final var ex = assertThrows(ExecutionException.class,
+                    () -> txCohort.get().canCommit().get(10, TimeUnit.SECONDS));
+                assertTrue("Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()),
+                    Throwables.getRootCause(ex) instanceof RequestTimeoutException);
             } finally {
                 try {
                     if (writeTxToClose != null) {
@@ -407,7 +428,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
 
             // 2. Write some data
-            final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             writeTx.write(TestModel.TEST_PATH, testNode);
 
             // 3. Ready the Tx for commit
@@ -433,9 +454,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // the data from the first
             // Tx is visible after being readied.
             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
-            Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", testNode, optional.get());
+            assertEquals(Optional.of(testNode), readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
 
             // 6. Create a new RW Tx from the chain, write more data,
             // and ready it
@@ -451,9 +470,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // from the last RW Tx to
             // verify it is visible.
             readTx = txChain.newReadWriteTransaction();
-            optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", outerNode, optional.get());
+            assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
 
             // 8. Wait for the 2 commits to complete and close the
             // chain.
@@ -471,9 +488,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // 9. Create a new read Tx from the data store and verify
             // committed data.
             readTx = dataStore.newReadOnlyTransaction();
-            optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", outerNode, optional.get());
+            assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
         }
     }
 
@@ -498,7 +513,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
 
-            final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
             readWriteTx.write(carPath, car);
 
@@ -506,13 +521,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
             readWriteTx.merge(personPath, person);
 
-            Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", car, optional.get());
-
-            optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", person, optional.get());
+            assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS));
 
             final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
 
@@ -533,12 +543,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-            optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertFalse("isPresent", optional.isPresent());
-
-            optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", person, optional.get());
+            assertEquals(Optional.empty(), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
         }
     }
 
@@ -568,7 +574,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
 
                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
-                    CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+                    CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
 
                 futures.add(rwTx.commit());
             }
@@ -577,10 +583,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 f.get(5, TimeUnit.SECONDS);
             }
 
-            final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+            final Optional<NormalizedNode> optional = txChain.newReadOnlyTransaction()
                     .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
             assertTrue("isPresent", optional.isPresent());
-            assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+            assertEquals("# cars", numCars, ((Collection<?>) optional.orElseThrow().body()).size());
 
             txChain.close();
 
@@ -602,7 +608,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
 
-            final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+            final Optional<NormalizedNode> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
             assertFalse("isPresent", optional.isPresent());
 
             txChain.close();
@@ -658,10 +664,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
 
             // Create read-only tx's and issue a read.
-            FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture1 = txChain
+            FluentFuture<Optional<NormalizedNode>> readFuture1 = txChain
                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
 
-            FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture2 = txChain
+            FluentFuture<Optional<NormalizedNode>> readFuture2 = txChain
                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
 
             // Create another write tx and issue the write.
@@ -689,35 +695,31 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
     @Test
     public void testChainedTransactionFailureWithSingleShard() throws Exception {
-        final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
+        final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+        try (var dataStore = testKit.setupAbstractDataStore(testParameter,
+                "testChainedTransactionFailureWithSingleShard", "cars-1")) {
 
-            final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+            final var broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
                 MoreExecutors.directExecutor());
 
-            final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
-            final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+            final var listener = Mockito.mock(DOMTransactionChainListener.class);
+            final var txChain = broker.createTransactionChain(listener);
 
-            final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
+            final var writeTx = txChain.newReadWriteTransaction();
 
             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
                 PeopleModel.emptyContainer());
 
-            final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
-                    .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
-                    .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+            final var invalidData = Builders.containerBuilder()
+                    .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+                    .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+                    .build();
 
             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
-            try {
-                writeTx.commit().get(5, TimeUnit.SECONDS);
-                fail("Expected TransactionCommitFailedException");
-            } catch (final ExecutionException e) {
-                // Expected
-            }
+            assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
 
             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
                 any(Throwable.class));
@@ -746,21 +748,17 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
                 PeopleModel.emptyContainer());
 
-            final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
-                    .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
-                    .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+            final ContainerNode invalidData = Builders.containerBuilder()
+                .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+                .build();
 
             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
             // Note that merge will validate the data and fail but put
             // succeeds b/c deep validation is not
             // done for put for performance reasons.
-            try {
-                writeTx.commit().get(5, TimeUnit.SECONDS);
-                fail("Expected TransactionCommitFailedException");
-            } catch (final ExecutionException e) {
-                // Expected
-            }
+            assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
 
             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
                 any(Throwable.class));
@@ -828,13 +826,13 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         final String name = "transactionIntegrationTest";
 
         final ContainerNode carsNode = CarsModel.newCarsNode(
-            CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
-                CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
+            CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)),
+                CarsModel.newCarEntry("sportage", Uint64.valueOf(30000))));
 
         DataTree dataTree = new InMemoryDataTreeFactory().create(
             DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
         AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
-        NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+        NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of());
 
         final Snapshot carsSnapshot = Snapshot.create(
             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
@@ -843,10 +841,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
             SchemaContextHelper.full());
 
-        final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
+        final NormalizedNode peopleNode = PeopleModel.create();
         AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
 
-        root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+        root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of());
 
         final Snapshot peopleSnapshot = Snapshot.create(
             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
@@ -862,13 +860,63 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
             // two reads
-            Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", carsNode, optional.get());
+            assertEquals(Optional.of(carsNode), readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS));
+            assertEquals(Optional.of(peopleNode), readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS));
+        }
+    }
 
-            optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("Data node", peopleNode, optional.get());
+    @Test
+    @Ignore("ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate")
+    // FIXME: re-enable this test
+    public void testSnapshotOnRootOverwrite() throws Exception {
+        final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder.snapshotOnRootOverwrite(true));
+        try (var dataStore = testKit.setupAbstractDataStore(
+                testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf",
+                true, "cars", "default")) {
+
+            final var rootNode = Builders.containerBuilder()
+                .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))
+                .withChild(CarsModel.create())
+                .build();
+
+            testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode);
+            IntegrationTestKit.verifyShardState(dataStore, "cars",
+                state -> assertEquals(1, state.getSnapshotIndex()));
+
+            // root has been written expect snapshot at index 0
+            verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
+
+            for (int i = 0; i < 10; i++) {
+                testKit.testWriteTransaction(dataStore, CarsModel.newCarPath("car " + i),
+                    CarsModel.newCarEntry("car " + i, Uint64.ONE));
+            }
+
+            // fake snapshot causes the snapshotIndex to move
+            IntegrationTestKit.verifyShardState(dataStore, "cars",
+                state -> assertEquals(10, state.getSnapshotIndex()));
+
+            // however the real snapshot still has not changed and was taken at index 0
+            verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
+
+            // root overwrite so expect a snapshot
+            testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode);
+
+            // this was a real snapshot so everything should be in it(1 + 10 + 1)
+            IntegrationTestKit.verifyShardState(dataStore, "cars",
+                state -> assertEquals(12, state.getSnapshotIndex()));
+
+            verifySnapshot("member-1-shard-cars-testRootOverwrite", 12, 1);
         }
     }
+
+    private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex,
+            final long lastAppliedTerm) {
+        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+                List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
+                assertEquals(1, snap.size());
+                assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
+                assertEquals(lastAppliedTerm, snap.get(0).getLastAppliedTerm());
+            }
+        );
+    }
 }