Fix Eclipse compilation error
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index 24371d925f1ea02f624a33bb7352103a55da14db..a298508cc9fff0e40ca6def256fab38babb12ba7 100644 (file)
@@ -7,13 +7,20 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 
@@ -23,18 +30,17 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
+import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
-import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -42,23 +48,28 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
-import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
+import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
+import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -68,10 +79,17 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongBitmap;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -82,6 +100,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
@@ -91,18 +110,21 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.Uint64;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.collection.Set;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -118,7 +140,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
+                { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
         });
     }
 
@@ -182,9 +204,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             leaderDistributedDataStore.close();
         }
 
-        TestKit.shutdownActorSystem(leaderSystem);
-        TestKit.shutdownActorSystem(followerSystem);
-        TestKit.shutdownActorSystem(follower2System);
+        TestKit.shutdownActorSystem(leaderSystem, true);
+        TestKit.shutdownActorSystem(followerSystem, true);
+        TestKit.shutdownActorSystem(follower2System,true);
 
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
@@ -200,16 +222,23 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards)
             throws Exception {
-        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout);
+        initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder,
+                followerDatastoreContextBuilder);
+    }
+
+    private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards,
+            final DatastoreContext.Builder leaderBuilder, final DatastoreContext.Builder followerBuilder)
+                    throws Exception {
+        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout);
 
         leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
                 testParameter, type, moduleShardsConfig, false, shards);
 
-        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout);
+        followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout);
         followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
                 testParameter, type, moduleShardsConfig, false, shards);
 
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards);
 
         leaderTestKit.waitForMembersUp("member-2");
         followerTestKit.waitForMembersUp("member-1");
@@ -217,12 +246,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
             throws Exception {
-        final Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        final Optional<NormalizedNode> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
         assertTrue("isPresent", optional.isPresent());
 
-        final CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
+        final CollectionNodeBuilder<MapEntryNode, SystemMapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
                 CarsModel.CAR_QNAME);
-        for (final NormalizedNode<?, ?> entry: entries) {
+        for (final NormalizedNode entry: entries) {
             listBuilder.withChild((MapEntryNode) entry);
         }
 
@@ -230,16 +259,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     }
 
     private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
-            final NormalizedNode<?, ?> expNode) throws Exception {
-        final Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
-        assertTrue("isPresent", optional.isPresent());
-        assertEquals("Data node", expNode, optional.get());
+            final NormalizedNode expNode) throws Exception {
+        assertEquals(Optional.of(expNode), readTx.read(path).get(5, TimeUnit.SECONDS));
     }
 
     private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
             throws Exception {
-        final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
-        assertEquals("exists", Boolean.TRUE, exists);
+        assertEquals("exists", Boolean.TRUE, readTx.exists(path).get(5, TimeUnit.SECONDS));
     }
 
     @Test
@@ -255,11 +281,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
 
-        final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
         final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
         writeTx.merge(car1Path, car1);
 
-        final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
+        final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
         final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
         writeTx.merge(car2Path, car2);
 
@@ -328,6 +354,126 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
+        initDatastoresWithCars("testSingleTransactionsWritesInQuickSuccession");
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
+            followerTestKit.doCommit(writeTx.ready());
+
+            try (var tx = txChain.newReadOnlyTransaction()) {
+                tx.read(CarsModel.BASE_PATH).get();
+            }
+        }
+
+        // wait to let the shard catch up with purged
+        await("Range set leak test").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
+                        .orElseThrow();
+                    final var frontendMetadata =
+                        (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                            .executeOperation(localShard, new RequestFrontendMetadata());
+
+                    final var clientMeta = frontendMetadata.getClients().get(0);
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        assertTellClientMetadata(clientMeta, numCars * 2);
+                    } else {
+                        assertAskClientMetadata(clientMeta);
+                    }
+                });
+
+        try (var tx = txChain.newReadOnlyTransaction()) {
+            final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body();
+            assertThat(body, instanceOf(Collection.class));
+            assertEquals(numCars, ((Collection<?>) body).size());
+        }
+    }
+
+    private void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
+        // ask based should track no metadata
+        assertEquals(List.of(), clientMeta.getCurrentHistories());
+    }
+
+    private void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
+        final var iterator = clientMeta.getCurrentHistories().iterator();
+        var metadata = iterator.next();
+        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+            metadata = iterator.next();
+        }
+
+        // FIXME: CONTROLLER-1991: remove this assumption
+        assumeTrue(false);
+
+        assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions());
+        assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString());
+    }
+
+    @Test
+    public void testCloseTransactionMetadataLeak() throws Exception {
+        // FIXME: Ask-based frontend seems to have some issues with back to back close
+        assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
+
+        initDatastoresWithCars("testCloseTransactionMetadataLeak");
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.close();
+
+            try (var tx = txChain.newReadOnlyTransaction()) {
+                tx.read(CarsModel.BASE_PATH).get();
+            }
+        }
+
+        writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        // wait to let the shard catch up with purged
+        await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
+                        .orElseThrow();
+                    final var frontendMetadata =
+                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                                    .executeOperation(localShard, new RequestFrontendMetadata());
+
+                    final var clientMeta = frontendMetadata.getClients().get(0);
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        assertTellClientMetadata(clientMeta, numCars * 2 + 1);
+                    } else {
+                        assertAskClientMetadata(clientMeta);
+                    }
+                });
+
+        try (var tx = txChain.newReadOnlyTransaction()) {
+            final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body();
+            assertThat(body, instanceOf(Collection.class));
+            assertEquals(numCars, ((Collection<?>) body).size());
+        }
+    }
+
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
         initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
@@ -338,12 +484,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
         rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
 
-        final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
         rwTx.merge(CarsModel.newCarPath("optima"), car1);
 
         verifyCars(rwTx, car1);
 
-        final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
+        final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
         final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
         rwTx.merge(car2Path, car2);
 
@@ -362,11 +508,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
 
         final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
-        final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
+        final NormalizedNode carsNode = CarsModel.emptyContainer();
         writeTx.write(carsPath, carsNode);
 
         final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
-        final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
+        final NormalizedNode peopleNode = PeopleModel.emptyContainer();
         writeTx.write(peoplePath, peopleNode);
 
         followerTestKit.doCommit(writeTx.ready());
@@ -385,11 +531,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         assertNotNull("newReadWriteTransaction returned null", rwTx);
 
         final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
-        final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
+        final NormalizedNode carsNode = CarsModel.emptyContainer();
         rwTx.write(carsPath, carsNode);
 
         final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
-        final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
+        final NormalizedNode peopleNode = PeopleModel.emptyContainer();
         rwTx.write(peoplePath, peopleNode);
 
         followerTestKit.doCommit(rwTx.ready());
@@ -427,7 +573,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
 
-        final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
         final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
         rwTx.write(car1Path, car1);
 
@@ -435,7 +581,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         verifyCars(rwTx, car1);
 
-        final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
+        final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
         rwTx.merge(CarsModel.newCarPath("sportage"), car2);
 
         rwTx.delete(car1Path);
@@ -468,7 +614,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
 
-        final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
         final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
         readWriteTx.write(carPath, car);
 
@@ -476,13 +622,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
         readWriteTx.merge(personPath, person);
 
-        Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
-        assertTrue("isPresent", optional.isPresent());
-        assertEquals("Data node", car, optional.get());
-
-        optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
-        assertTrue("isPresent", optional.isPresent());
-        assertEquals("Data node", person, optional.get());
+        assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
+        assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS));
 
         final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
 
@@ -500,8 +641,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
         verifyCars(readTx, car);
 
-        optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-        assertFalse("isPresent", optional.isPresent());
+        assertEquals(Optional.empty(), readTx.read(personPath).get(5, TimeUnit.SECONDS));
     }
 
     @Test
@@ -513,7 +653,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
                         MoreExecutors.directExecutor());
 
-        final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
+        final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class);
         final DOMTransactionChain txChain = broker.createTransactionChain(listener);
 
         final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
@@ -524,12 +664,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
-        try {
-            writeTx.commit().get(5, TimeUnit.SECONDS);
-            fail("Expected TransactionCommitFailedException");
-        } catch (final ExecutionException e) {
-            // Expected
-        }
+        final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
+            .getCause();
+        assertThat(ex, instanceOf(TransactionCommitFailedException.class));
 
         verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
 
@@ -546,7 +683,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
                         MoreExecutors.directExecutor());
 
-        final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
+        final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class);
         final DOMTransactionChain txChain = broker.createTransactionChain(listener);
 
         final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
@@ -561,12 +698,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // done for put for performance reasons.
         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
-        try {
-            writeTx.commit().get(5, TimeUnit.SECONDS);
-            fail("Expected TransactionCommitFailedException");
-        } catch (final ExecutionException e) {
-            // Expected
-        }
+        final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
+            .getCause();
+        assertThat(ex, instanceOf(TransactionCommitFailedException.class));
 
         verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
 
@@ -602,7 +736,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         TestKit.shutdownActorSystem(leaderSystem, true);
         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
-        followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
+        followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS);
 
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
@@ -615,13 +749,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 newMember1TestKit.setupAbstractDataStore(
                         testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
 
-            followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
+            followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS);
 
             // Write a car entry to the new leader - should switch to local Tx
 
             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
 
-            MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
             writeTx.merge(car1Path, car1);
 
@@ -635,10 +769,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
-        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
+        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
 
-        final com.google.common.base.Optional<ActorRef> carsFollowerShard =
-                followerDistributedDataStore.getActorContext().findLocalShard("cars");
+        final Optional<ActorRef> carsFollowerShard =
+                followerDistributedDataStore.getActorUtils().findLocalShard("cars");
         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
         final DataTree dataTree = new InMemoryDataTreeFactory().create(
@@ -650,12 +784,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
 
-        final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
         modification.ready();
 
-        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true,
-                java.util.Optional.empty());
+        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -670,11 +803,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // Send another tx without immediate commit.
 
         modification = dataTree.takeSnapshot().newModification();
-        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
         modification.ready();
 
-        readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty());
+        readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
@@ -684,14 +817,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
 
-        final ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+        final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
-        final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
-        Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
-        ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
-                leaderDistributedDataStore.getActorContext(), Arrays.asList(
-                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
+        ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(leaderDistributedDataStore.getActorUtils(),
+            List.of(new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor),
+                () -> DataStoreVersions.CURRENT_VERSION)), tx2);
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
@@ -703,10 +834,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Test
     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
-        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
+        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
 
-        final com.google.common.base.Optional<ActorRef> carsFollowerShard =
-                followerDistributedDataStore.getActorContext().findLocalShard("cars");
+        final Optional<ActorRef> carsFollowerShard =
+                followerDistributedDataStore.getActorUtils().findLocalShard("cars");
         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
         carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
@@ -718,13 +849,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
 
-        final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
 
-        ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
-                DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true,
-                java.util.Optional.empty());
+        ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION,
+            new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification),
+            true, Optional.empty());
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -739,13 +869,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // Send another tx without immediate commit.
 
         modification = dataTree.takeSnapshot().newModification();
-        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
 
-        forwardedReady = new ForwardedReadyTransaction(tx2,
-                DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false,
-                java.util.Optional.empty());
+        forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION,
+            new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification),
+            false, Optional.empty());
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
@@ -755,14 +884,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
 
-        ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+        ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
-        final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
-        Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
         final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
-                leaderDistributedDataStore.getActorContext(), Arrays.asList(
-                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
+            leaderDistributedDataStore.getActorUtils(), List.of(
+                new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor),
+                    () -> DataStoreVersions.CURRENT_VERSION)), tx2);
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
@@ -772,8 +900,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
+
         followerDatastoreContextBuilder.shardBatchedModificationCount(2);
         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
@@ -809,10 +938,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
         final LinkedList<MapEntryNode> cars = new LinkedList<>();
         int carIndex = 1;
-        cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+        cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
         writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         carIndex++;
-        NormalizedNode<?, ?> people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
+        NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
                 .withChild(PeopleModel.newPersonEntry("Dude")).build();
         writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
         final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
@@ -824,7 +953,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
         for (int i = 1; i <= 5; i++, carIndex++) {
-            cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+            cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
             writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         }
 
@@ -832,7 +961,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // message on ready.
 
         final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
-        cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+        cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
         writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         carIndex++;
 
@@ -840,11 +969,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // leader shard on ready.
 
         final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
-        cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+        cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
         readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
 
         IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
+            stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
 
         // Disable elections on the leader so it switches to follower.
 
@@ -852,7 +981,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
                 .shardElectionTimeoutFactor(10));
 
-        leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
+        leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars");
 
         // Submit all tx's - the messages should get queued for retry.
 
@@ -866,9 +995,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
                 .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
-        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars")
+        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars")
                 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
-        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people")
+        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people")
                 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
 
         followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
@@ -884,15 +1013,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testLeadershipTransferOnShutdown() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
+
         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
         final String testName = "testLeadershipTransferOnShutdown";
         initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
 
         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
-                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100),
+                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500),
                 commitTimeout);
         try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
                 testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
@@ -912,7 +1042,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
 
             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
-            final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             writeTx.write(CarsModel.newCarPath("optima"), car);
             final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
 
@@ -925,7 +1055,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .shardElectionTimeoutFactor(100));
 
             final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
-            final Future<ActorRef> future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars");
+            final Future<ActorRef> future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars");
             final ActorRef leaderActor = Await.result(future, duration);
 
             final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
@@ -949,8 +1079,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionWithIsolatedLeader() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
+
         // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
         final String testName = "testTransactionWithIsolatedLeader";
@@ -969,9 +1100,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
         // Stop the follower
-        followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
+        followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager());
         followerDistributedDataStore.close();
-        followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
+        followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager());
 
         // Submit the preIsolatedLeaderWriteTx so it's pending
         final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
@@ -983,12 +1114,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
             raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
 
-        try {
-            leaderTestKit.doCommit(noShardLeaderWriteTx.ready());
-            fail("Expected NoShardLeaderException");
-        } catch (final ExecutionException e) {
-            assertEquals("getCause", NoShardLeaderException.class, Throwables.getRootCause(e).getClass());
-        }
+        final var ex = assertThrows(ExecutionException.class,
+            () -> leaderTestKit.doCommit(noShardLeaderWriteTx.ready()));
+        assertEquals(NoShardLeaderException.class, Throwables.getRootCause(ex).getClass());
 
         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
                 .shardElectionTimeoutFactor(100));
@@ -1024,17 +1152,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
-        try {
-            followerTestKit.doCommit(rwTx.ready());
-            fail("Exception expected");
-        } catch (final ExecutionException e) {
-            final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
-            if (DistributedDataStore.class.equals(testParameter)) {
-                assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
-                        || e.getCause() instanceof ShardLeaderNotRespondingException);
-            } else {
-                assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
-            }
+        final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready()));
+        final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause());
+        if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
+            assertTrue(msg, Throwables.getRootCause(ex) instanceof NoShardLeaderException
+                || ex.getCause() instanceof ShardLeaderNotRespondingException);
+        } else {
+            assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class));
         }
     }
 
@@ -1063,16 +1187,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
-        try {
-            followerTestKit.doCommit(rwTx.ready());
-            fail("Exception expected");
-        } catch (final ExecutionException e) {
-            final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
-            if (DistributedDataStore.class.equals(testParameter)) {
-                assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
-            } else {
-                assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
-            }
+        final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready()));
+        final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause());
+        if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
+            assertThat(msg, Throwables.getRootCause(ex), instanceOf(NoShardLeaderException.class));
+        } else {
+            assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class));
         }
     }
 
@@ -1116,6 +1236,68 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testSemiReachableCandidateNotDroppingLeader() throws Exception {
+        final String testName = "testSemiReachableCandidateNotDroppingLeader";
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+        final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
+                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
+        final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
+                follower2System, follower2DatastoreContextBuilder, commitTimeout);
+
+        final AbstractDataStore ds2 =
+                     follower2TestKit.setupAbstractDataStore(
+                             testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
+
+        followerTestKit.waitForMembersUp("member-1", "member-3");
+        follower2TestKit.waitForMembersUp("member-1", "member-2");
+
+        // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option
+        TestKit.shutdownActorSystem(follower2System, true);
+
+        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+        Cluster leaderCluster = Cluster.get(leaderSystem);
+        Cluster followerCluster = Cluster.get(followerSystem);
+        Cluster follower2Cluster = Cluster.get(follower2System);
+
+        Member follower2Member = follower2Cluster.readView().self();
+
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> containsUnreachable(leaderCluster, follower2Member));
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> containsUnreachable(followerCluster, follower2Member));
+
+        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+
+        // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
+        // candidate, we can just send a couple of RequestVotes to both leader and follower.
+        cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+        followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+        cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+        followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+
+        OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+        OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+        assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm());
+        assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm());
+
+        ds2.close();
+    }
+
+    private static Boolean containsUnreachable(final Cluster cluster, final Member member) {
+        // unreachableMembers() returns scala.collection.immutable.Set, but we are using scala.collection.Set to fix JDT
+        // see https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276#c32
+        final Set<Member> members = cluster.readView().unreachableMembers();
+        return members.contains(member);
+    }
+
     @Test
     public void testInstallSnapshot() throws Exception {
         final String testName = "testInstallSnapshot";
@@ -1129,10 +1311,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             SchemaContextHelper.full());
 
         final ContainerNode carsNode = CarsModel.newCarsNode(
-                CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
+                CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000))));
         AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
 
-        final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY);
+        final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty());
         final Snapshot initialSnapshot = Snapshot.create(
                 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
                 Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
@@ -1143,10 +1325,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         initDatastoresWithCars(testName);
 
-        final Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
-                CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
-        assertTrue("isPresent", readOptional.isPresent());
-        assertEquals("Node", carsNode, readOptional.get());
+        assertEquals(Optional.of(carsNode), leaderDistributedDataStore.newReadOnlyTransaction().read(
+            CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS));
 
         verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
                 initialSnapshot, snapshotRoot);
@@ -1158,7 +1338,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Test
     public void testReadWriteMessageSlicing() throws Exception {
         // The slicing is only implemented for tell-based protocol
-        Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+        assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
 
         leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
         followerDatastoreContextBuilder.maximumMessageSliceSize(100);
@@ -1166,14 +1346,143 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
-        final NormalizedNode<?, ?> carsNode = CarsModel.create();
+        final NormalizedNode carsNode = CarsModel.create();
         rwTx.write(CarsModel.BASE_PATH, carsNode);
 
         verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
     }
 
+    @SuppressWarnings("IllegalCatch")
+    @Test
+    public void testRaftCallbackDuringLeadershipDrop() throws Exception {
+        final String testName = "testRaftCallbackDuringLeadershipDrop";
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
+                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500)
+                        .shardLeaderElectionTimeoutInSeconds(3600),
+                commitTimeout);
+
+        final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        leaderTestKit.doCommit(initialWriteTx.ready());
+
+        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
+                testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
+
+            final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
+                    .getLocalShards().get("cars").getActor();
+            final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards()
+                    .getLocalShards().get("cars").getActor();
+            member2Cars.tell(new StartDropMessages(AppendEntries.class), null);
+            member3Cars.tell(new StartDropMessages(AppendEntries.class), null);
+
+            final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+            newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            final AtomicBoolean submitDone = new AtomicBoolean(false);
+            executor.submit(() -> {
+                try {
+                    leaderTestKit.doCommit(newTx.ready());
+                    submitDone.set(true);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards()
+                    .getLocalShards().get("cars").getActor();
+            await().atMost(10, TimeUnit.SECONDS)
+                    .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                            .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1);
+
+            final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils()
+                    .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE);
+
+            // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with
+            // new term(switching to candidate after election timeout)
+            leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1,
+                    "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1,
+                            -1), member3Cars);
+
+            member2Cars.tell(new StopDropMessages(AppendEntries.class), null);
+            member3Cars.tell(new StopDropMessages(AppendEntries.class), null);
+
+            await("Is tx stuck in COMMIT_PENDING")
+                    .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true));
+
+        }
+
+        executor.shutdownNow();
+    }
+
+    @Test
+    public void testSnapshotOnRootOverwrite() throws Exception {
+        // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
+        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
+
+        final String testName = "testSnapshotOnRootOverwrite";
+        final String[] shards = {"cars", "default"};
+        initDatastores(testName, "module-shards-default-cars-member1-and-2.conf", shards,
+                leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true),
+                followerDatastoreContextBuilder.snapshotOnRootOverwrite(true));
+
+        leaderTestKit.waitForMembersUp("member-2");
+        final ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
+                .withChild(CarsModel.create())
+                .build();
+
+        leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode);
+
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
+            state -> assertEquals(1, state.getSnapshotIndex()));
+
+        IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
+            state -> assertEquals(1, state.getSnapshotIndex()));
+
+        verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1);
+        verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1);
+
+        for (int i = 0; i < 10; i++) {
+            leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i),
+                    CarsModel.newCarEntry("car " + i, Uint64.ONE));
+        }
+
+        // fake snapshot causes the snapshotIndex to move
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
+            state -> assertEquals(10, state.getSnapshotIndex()));
+        IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
+            state -> assertEquals(10, state.getSnapshotIndex()));
+
+        // however the real snapshot still has not changed and was taken at index 1
+        verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1);
+        verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1);
+
+        // root overwrite so expect a snapshot
+        leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode);
+
+        // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1)
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
+            state -> assertEquals(12, state.getSnapshotIndex()));
+        IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
+            state -> assertEquals(12, state.getSnapshotIndex()));
+
+        verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 12);
+        verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12);
+    }
+
+    private void verifySnapshot(final String persistenceId, final long lastAppliedIndex) {
+        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+                List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
+                assertEquals(1, snap.size());
+                assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
+            }
+        );
+    }
+
     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
-                                       final NormalizedNode<?, ?> expRoot) {
+                                       final NormalizedNode expRoot) {
         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
         assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
         assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());
@@ -1186,10 +1495,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
-        final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+        final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
         final Answer<DatastoreContext> answer = invocation -> newBuilder.build();
-        Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
-        Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+        doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
+        doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(anyString());
         dataStore.onDatastoreContextUpdated(mockContextFactory);
     }
 }