Add unit test for FrontedMetadata memory leaks
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index 3298fa2c05c2464342e1254cbc0ff9b4dd3c043b..6fead6bbad2d0166d1183ba72146b45ee318139c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -7,12 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 
@@ -23,14 +25,14 @@ import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.dispatch.Futures;
-import akka.pattern.AskTimeoutException;
 import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import com.google.common.base.Optional;
+import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -39,14 +41,18 @@ import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -54,10 +60,13 @@ import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -67,8 +76,11 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@@ -79,26 +91,24 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@@ -118,12 +128,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class }, { ClientBackedDataStore.class }
+                { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
         });
     }
 
-    @Parameter
+    @Parameter(0)
     public Class<? extends AbstractDataStore> testParameter;
+    @Parameter(1)
+    public int commitTimeout;
 
     private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
     private static final String[] CARS = {"cars"};
@@ -136,6 +148,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
+    private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf";
 
     private ActorSystem leaderSystem;
     private ActorSystem followerSystem;
@@ -179,9 +192,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             leaderDistributedDataStore.close();
         }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem);
-        JavaTestKit.shutdownActorSystem(followerSystem);
-        JavaTestKit.shutdownActorSystem(follower2System);
+        TestKit.shutdownActorSystem(leaderSystem);
+        TestKit.shutdownActorSystem(followerSystem);
+        TestKit.shutdownActorSystem(follower2System);
 
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
@@ -197,22 +210,25 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards)
             throws Exception {
-        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout);
 
         leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
                 testParameter, type, moduleShardsConfig, false, shards);
 
-        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout);
         followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
                 testParameter, type, moduleShardsConfig, false, shards);
 
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards);
+
+        leaderTestKit.waitForMembersUp("member-2");
+        followerTestKit.waitForMembersUp("member-1");
     }
 
     private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
             throws Exception {
         final Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
-        assertEquals("isPresent", true, optional.isPresent());
+        assertTrue("isPresent", optional.isPresent());
 
         final CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
                 CarsModel.CAR_QNAME);
@@ -226,14 +242,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
             final NormalizedNode<?, ?> expNode) throws Exception {
         final Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
-        assertEquals("isPresent", true, optional.isPresent());
+        assertTrue("isPresent", optional.isPresent());
         assertEquals("Data node", expNode, optional.get());
     }
 
     private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
             throws Exception {
         final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
-        assertEquals("exists", true, exists);
+        assertEquals("exists", Boolean.TRUE, exists);
     }
 
     @Test
@@ -310,17 +326,143 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
         }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
-        JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
+        TestKit.shutdownActorSystem(leaderSystem, true);
+        TestKit.shutdownActorSystem(followerSystem, true);
 
         final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
 
-        try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
+        try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
+                commitTimeout)
                 .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
     }
 
+    @Test
+    public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
+        final String testName = "testWriteTransactionWithSingleShard";
+        initDatastoresWithCars(testName);
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.write(CarsModel.newCarPath("car" + i),
+                    CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+            followerTestKit.doCommit(writeTx.ready());
+
+            DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+            domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+            domStoreReadTransaction.close();
+        }
+
+        // wait to let the shard catch up with purged
+        await("Range set leak test").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Optional<ActorRef> localShard =
+                            leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                                    .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        Iterator<FrontendHistoryMetadata> iterator =
+                                frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                        FrontendHistoryMetadata metadata = iterator.next();
+                        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                            metadata = iterator.next();
+                        }
+
+                        assertEquals(0, metadata.getClosedTransactions().size());
+                        assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
+                                metadata.getPurgedTransactions().asRanges().iterator().next());
+                    } else {
+                        // ask based should track no metadata
+                        assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+                    }
+                });
+
+        final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+                .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        assertTrue("isPresent", optional.isPresent());
+        assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+    }
+
+    @Test
+    @Ignore("Flushes out tell based leak needs to be handled separately")
+    public void testCloseTransactionMetadataLeak() throws Exception {
+        // Ask based frontend seems to have some issues with back to back close
+        Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
+
+        final String testName = "testWriteTransactionWithSingleShard";
+        initDatastoresWithCars(testName);
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.close();
+
+            DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+            domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+            domStoreReadTransaction.close();
+        }
+
+        writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        // wait to let the shard catch up with purged
+        await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Optional<ActorRef> localShard =
+                            leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                                    .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        Iterator<FrontendHistoryMetadata> iterator =
+                                frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                        FrontendHistoryMetadata metadata = iterator.next();
+                        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                            metadata = iterator.next();
+                        }
+
+                        Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+                        assertEquals(0, metadata.getClosedTransactions().size());
+                        assertEquals(1, ranges.size());
+                    } else {
+                        // ask based should track no metadata
+                        assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+                    }
+                });
+
+        final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+                .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        assertTrue("isPresent", optional.isPresent());
+        assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+    }
+
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
         initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
@@ -470,11 +612,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         readWriteTx.merge(personPath, person);
 
         Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
-        assertEquals("isPresent", true, optional.isPresent());
+        assertTrue("isPresent", optional.isPresent());
         assertEquals("Data node", car, optional.get());
 
         optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
-        assertEquals("isPresent", true, optional.isPresent());
+        assertTrue("isPresent", optional.isPresent());
         assertEquals("Data node", person, optional.get());
 
         final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
@@ -494,7 +636,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         verifyCars(readTx, car);
 
         optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-        assertEquals("isPresent", false, optional.isPresent());
+        assertFalse("isPresent", optional.isPresent());
     }
 
     @Test
@@ -506,10 +648,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
                         MoreExecutors.directExecutor());
 
-        final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+        final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
         final DOMTransactionChain txChain = broker.createTransactionChain(listener);
 
-        final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
 
         final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
                 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
@@ -518,9 +660,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
         try {
-            writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+            writeTx.commit().get(5, TimeUnit.SECONDS);
             fail("Expected TransactionCommitFailedException");
-        } catch (final TransactionCommitFailedException e) {
+        } catch (final ExecutionException e) {
             // Expected
         }
 
@@ -539,10 +681,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
                         MoreExecutors.directExecutor());
 
-        final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+        final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
         final DOMTransactionChain txChain = broker.createTransactionChain(listener);
 
-        final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
 
         writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
 
@@ -555,9 +697,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
         try {
-            writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+            writeTx.commit().get(5, TimeUnit.SECONDS);
             fail("Expected TransactionCommitFailedException");
-        } catch (final TransactionCommitFailedException e) {
+        } catch (final ExecutionException e) {
             // Expected
         }
 
@@ -569,8 +711,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
         final String testName = "testSingleShardTransactionsWithLeaderChanges";
         initDatastoresWithCars(testName);
 
@@ -593,23 +734,23 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
                 .shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        TestKit.shutdownActorSystem(leaderSystem, true);
         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
-        followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
+        followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS);
 
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
 
         final DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder()
                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
-        IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
+        IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout);
 
-        try (final AbstractDataStore ds =
+        try (AbstractDataStore ds =
                 newMember1TestKit.setupAbstractDataStore(
                         testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
 
-            followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
+            followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS);
 
             // Write a car entry to the new leader - should switch to local Tx
 
@@ -629,14 +770,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
-        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
+        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
 
-        final Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext()
-                .findLocalShard("cars");
-        assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
+        final Optional<ActorRef> carsFollowerShard =
+                followerDistributedDataStore.getActorUtils().findLocalShard("cars");
+        assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
-        final TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        dataTree.setSchemaContext(SchemaContextHelper.full());
+        final DataTree dataTree = new InMemoryDataTreeFactory().create(
+            DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
 
         // Send a tx with immediate commit.
 
@@ -648,7 +789,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
         modification.ready();
 
-        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true);
+        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -667,7 +808,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
         modification.ready();
 
-        readyLocal = new ReadyLocalTransaction(tx2 , modification, false);
+        readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
@@ -677,13 +818,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
 
-        final ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+        final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
         final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
         Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
-                leaderDistributedDataStore.getActorContext(), Arrays.asList(
+                leaderDistributedDataStore.getActorUtils(), Arrays.asList(
                         new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
@@ -696,11 +837,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Test
     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
-        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
+        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
 
-        final Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext()
-                .findLocalShard("cars");
-        assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
+        final Optional<ActorRef> carsFollowerShard =
+                followerDistributedDataStore.getActorUtils().findLocalShard("cars");
+        assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
         carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
         final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
@@ -716,7 +857,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true);
+                        Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true,
+                Optional.empty());
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -736,7 +878,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         forwardedReady = new ForwardedReadyTransaction(tx2,
                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false);
+                        Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false,
+                Optional.empty());
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
@@ -746,13 +889,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
 
-        ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+        ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
         final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
         Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
         final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
-                leaderDistributedDataStore.getActorContext(), Arrays.asList(
+                leaderDistributedDataStore.getActorUtils(), Arrays.asList(
                         new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
@@ -763,8 +906,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         followerDatastoreContextBuilder.shardBatchedModificationCount(2);
         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
@@ -779,10 +922,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // Wait for the commit to be replicated to the follower.
 
         MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
-            raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
+            raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
 
         MemberNode.verifyRaftState(followerDistributedDataStore, "people",
-            raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
+            raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
 
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.
@@ -803,7 +946,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
         writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         carIndex++;
-        NormalizedNode<?, ?> people = PeopleModel.newPersonMapNode();
+        NormalizedNode<?, ?> people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
+                .withChild(PeopleModel.newPersonEntry("Dude")).build();
         writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
         final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
 
@@ -834,7 +978,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
 
         IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
+            stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
 
         // Disable elections on the leader so it switches to follower.
 
@@ -842,8 +986,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
                 .shardElectionTimeoutFactor(10));
 
-        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
-        leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
+        leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars");
 
         // Submit all tx's - the messages should get queued for retry.
 
@@ -857,6 +1000,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
                 .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
+        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars")
+                .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people")
+                .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
 
         followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
         followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
@@ -871,18 +1018,22 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testLeadershipTransferOnShutdown() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
         final String testName = "testLeadershipTransferOnShutdown";
         initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
 
         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
-                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100));
-        try (final AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
+                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500),
+                commitTimeout);
+        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
                 testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
 
+            followerTestKit.waitForMembersUp("member-3");
+            follower2TestKit.waitForMembersUp("member-1", "member-2");
+
             // Create and submit a couple tx's so they're pending.
 
             DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
@@ -908,7 +1059,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .shardElectionTimeoutFactor(100));
 
             final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
-            final Future<ActorRef> future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars");
+            final Future<ActorRef> future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars");
             final ActorRef leaderActor = Await.result(future, duration);
 
             final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
@@ -932,8 +1083,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionWithIsolatedLeader() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
         final String testName = "testTransactionWithIsolatedLeader";
@@ -952,9 +1103,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
         // Stop the follower
-        followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
+        followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager());
         followerDistributedDataStore.close();
-        followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
+        followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager());
 
         // Submit the preIsolatedLeaderWriteTx so it's pending
         final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
@@ -985,21 +1136,20 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         leaderTestKit.doCommit(successTxCohort);
     }
 
-    @Test(expected = AskTimeoutException.class)
+    @Test
     public void testTransactionWithShardLeaderNotResponding() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
         initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
 
         // Do an initial read to get the primary shard info cached.
 
         final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
-        readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+        readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
 
         // Shutdown the leader and try to create a new tx.
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        TestKit.shutdownActorSystem(leaderSystem, true);
 
         followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
@@ -1010,29 +1160,31 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         try {
             followerTestKit.doCommit(rwTx.ready());
+            fail("Exception expected");
         } catch (final ExecutionException e) {
-            assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(),
-                    e.getCause() instanceof ShardLeaderNotRespondingException);
-            assertNotNull("Expected a nested cause", e.getCause().getCause());
-            Throwables.propagateIfInstanceOf(e.getCause().getCause(), Exception.class);
-            Throwables.propagate(e.getCause().getCause());
+            final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
+            if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
+                        || e.getCause() instanceof ShardLeaderNotRespondingException);
+            } else {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
+            }
         }
     }
 
-    @Test(expected = NoShardLeaderException.class)
+    @Test
     public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
         initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
 
         // Do an initial read to get the primary shard info cached.
 
         final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
-        readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+        readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
 
         // Shutdown the leader and try to create a new tx.
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        TestKit.shutdownActorSystem(leaderSystem, true);
 
         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
@@ -1047,27 +1199,31 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         try {
             followerTestKit.doCommit(rwTx.ready());
+            fail("Exception expected");
         } catch (final ExecutionException e) {
-            Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
-            Throwables.propagate(e.getCause());
+            final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
+            if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
+            } else {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
+            }
         }
     }
 
     @Test
     public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
         String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
-        initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS);
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
 
         final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
-                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
-                follower2System, follower2DatastoreContextBuilder);
+                follower2System, follower2DatastoreContextBuilder, commitTimeout);
 
-        try (final AbstractDataStore ds =
+        try (AbstractDataStore ds =
                 follower2TestKit.setupAbstractDataStore(
-                        testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {
+                        testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
 
             followerTestKit.waitForMembersUp("member-1", "member-3");
             follower2TestKit.waitForMembersUp("member-1", "member-2");
@@ -1075,16 +1231,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             // Do an initial read to get the primary shard info cached.
 
             final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
-            readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+            readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
 
             // Shutdown the leader and try to create a new tx.
 
-            JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+            TestKit.shutdownActorSystem(leaderSystem, true);
 
             Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
             sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
-                .operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
+                .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null));
 
             final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
@@ -1103,8 +1259,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
         // install a snapshot to sync the follower.
 
-        TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
-        tree.setSchemaContext(SchemaContextHelper.full());
+        DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION,
+            SchemaContextHelper.full());
 
         final ContainerNode carsNode = CarsModel.newCarsNode(
                 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
@@ -1122,8 +1278,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars(testName);
 
         final Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
-                CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
-        assertEquals("isPresent", true, readOptional.isPresent());
+                CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
+        assertTrue("isPresent", readOptional.isPresent());
         assertEquals("Node", carsNode, readOptional.get());
 
         verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
@@ -1133,6 +1289,23 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 initialSnapshot, snapshotRoot);
     }
 
+    @Test
+    public void testReadWriteMessageSlicing() throws Exception {
+        // The slicing is only implemented for tell-based protocol
+        Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
+
+        leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
+        followerDatastoreContextBuilder.maximumMessageSliceSize(100);
+        initDatastoresWithCars("testLargeReadReplySlicing");
+
+        final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+        final NormalizedNode<?, ?> carsNode = CarsModel.create();
+        rwTx.write(CarsModel.BASE_PATH, carsNode);
+
+        verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
+    }
+
     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
                                        final NormalizedNode<?, ?> expRoot) {
         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());