Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index 816d2acff7c874c4a3113c138b22d4583a20c689..91c00f7eb153c04a6fc64f6782a7349dab80e2f3 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -18,9 +17,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
@@ -39,6 +38,7 @@ import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -48,6 +48,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -72,8 +73,6 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
 import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
@@ -106,7 +105,6 @@ import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
@@ -148,7 +146,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     }
 
     @Parameter(0)
-    public Class<? extends AbstractDataStore> testParameter;
+    public Class<? extends ClientBackedDataStore> testParameter;
     @Parameter(1)
     public int commitTimeout;
 
@@ -178,8 +176,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private final TransactionIdentifier tx1 = nextTransactionId();
     private final TransactionIdentifier tx2 = nextTransactionId();
 
-    private AbstractDataStore followerDistributedDataStore;
-    private AbstractDataStore leaderDistributedDataStore;
+    private ClientBackedDataStore followerDistributedDataStore;
+    private ClientBackedDataStore leaderDistributedDataStore;
     private IntegrationTestKit followerTestKit;
     private IntegrationTestKit leaderTestKit;
 
@@ -201,7 +199,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @After
     public void tearDown() {
         if (followerDistributedDataStore != null) {
-            leaderDistributedDataStore.close();
+            followerDistributedDataStore.close();
         }
         if (leaderDistributedDataStore != null) {
             leaderDistributedDataStore.close();
@@ -234,11 +232,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                     throws Exception {
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout);
 
-        leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
-                testParameter, type, moduleShardsConfig, false, shards);
+        leaderDistributedDataStore = leaderTestKit.setupDataStore(testParameter, type, moduleShardsConfig, false,
+            shards);
 
         followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout);
-        followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
+        followerDistributedDataStore = followerTestKit.setupDataStore(
                 testParameter, type, moduleShardsConfig, false, shards);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards);
@@ -343,9 +341,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
 
-        try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
-                commitTimeout)
-                .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
+        try (var member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder, commitTimeout)
+                .setupDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
     }
@@ -374,35 +371,24 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // wait to let the shard catch up with purged
         await("Range set leak test").atMost(5, TimeUnit.SECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> {
-                    final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
-                        .orElseThrow();
-                    final var frontendMetadata =
-                        (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
-                            .executeOperation(localShard, new RequestFrontendMetadata());
-
-                    final var clientMeta = frontendMetadata.getClients().get(0);
-                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                        assertTellClientMetadata(clientMeta, numCars * 2);
-                    } else {
-                        assertAskClientMetadata(clientMeta);
-                    }
-                });
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
+                final var frontendMetadata =
+                    (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                    .executeOperation(localShard, new RequestFrontendMetadata());
+
+                assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2);
+            });
 
         try (var tx = txChain.newReadOnlyTransaction()) {
-            final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body();
-            assertThat(body, instanceOf(Collection.class));
+            final var body = assertInstanceOf(Collection.class,
+                tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body());
             assertEquals(numCars, ((Collection<?>) body).size());
         }
     }
 
-    private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
-        // ask based should track no metadata
-        assertEquals(List.of(), clientMeta.getCurrentHistories());
-    }
-
-    private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
+    private static void assertClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
         final var iterator = clientMeta.getCurrentHistories().iterator();
         var metadata = iterator.next();
         while (iterator.hasNext() && metadata.getHistoryId() != 1) {
@@ -415,19 +401,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testCloseTransactionMetadataLeak() throws Exception {
-        // FIXME: CONTROLLER-2016: ask-based frontend triggers this:
-        //
-        // java.lang.IllegalStateException: Previous transaction
-        //            member-2-datastore-testCloseTransactionMetadataLeak-fe-0-chn-1-txn-1-0 is not ready yet
-        //        at org.opendaylight.controller.cluster.datastore.TransactionChainProxy$Allocated.checkReady()
-        //        at org.opendaylight.controller.cluster.datastore.TransactionChainProxy.newReadOnlyTransaction()
-        assumeTrue(testParameter.isAssignableFrom(ClientBackedDataStore.class));
-
         initDatastoresWithCars("testCloseTransactionMetadataLeak");
 
-        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+        final var txChain = followerDistributedDataStore.createTransactionChain();
 
-        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        var writeTx = txChain.newWriteOnlyTransaction();
         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
         followerTestKit.doCommit(writeTx.ready());
@@ -445,21 +423,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // wait to let the shard catch up with purged
         await("wait for purges to settle").atMost(5, TimeUnit.SECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> {
-                    final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
-                        .orElseThrow();
-                    final var frontendMetadata =
-                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
-                                    .executeOperation(localShard, new RequestFrontendMetadata());
-
-                    final var clientMeta = frontendMetadata.getClients().get(0);
-                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                        assertTellClientMetadata(clientMeta, numCars * 2);
-                    } else {
-                        assertAskClientMetadata(clientMeta);
-                    }
-                });
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
+                final var frontendMetadata =
+                    (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                    .executeOperation(localShard, new RequestFrontendMetadata());
+
+                assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2);
+            });
     }
 
     @Test
@@ -641,8 +613,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
                         MoreExecutors.directExecutor());
 
-        final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class);
-        final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+        final var listener = mock(FutureCallback.class);
+        final DOMTransactionChain txChain = broker.createTransactionChain();
+        txChain.addCallback(listener);
 
         final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
 
@@ -651,11 +624,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
             .build());
 
-        final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
-            .getCause();
-        assertThat(ex, instanceOf(TransactionCommitFailedException.class));
+        final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
+        assertInstanceOf(TransactionCommitFailedException.class, ex.getCause());
 
-        verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+        verify(listener, timeout(5000)).onFailure(any());
 
         txChain.close();
         broker.close();
@@ -665,33 +637,32 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
         initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards");
 
-        final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
-                ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
-                        LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
-                        MoreExecutors.directExecutor());
+        try (var broker = new ConcurrentDOMDataBroker(
+            Map.of(LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore), MoreExecutors.directExecutor())) {
 
-        final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class);
-        final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+            final var listener = mock(FutureCallback.class);
+            final DOMTransactionChain txChain = broker.createTransactionChain();
+            txChain.addCallback(listener);
 
-        final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
 
-        writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+            writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
 
-        // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
-        // done for put for performance reasons.
-        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
-            .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
-            .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
-            .build());
+            // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
+            // done for put for performance reasons.
+            writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
+                .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+                .build());
 
-        final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
-            .getCause();
-        assertThat(ex, instanceOf(TransactionCommitFailedException.class));
+            final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
+                .getCause();
+            assertThat(ex, instanceOf(TransactionCommitFailedException.class));
 
-        verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+            verify(listener, timeout(5000)).onFailure(any());
 
-        txChain.close();
-        broker.close();
+            txChain.close();
+        }
     }
 
     @Test
@@ -731,9 +702,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout);
 
-        try (AbstractDataStore ds =
-                newMember1TestKit.setupAbstractDataStore(
-                        testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
+        try (var ds = newMember1TestKit.setupDataStore(testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false,
+            CARS)) {
 
             followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS);
 
@@ -964,8 +934,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction
         // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two
         // transactions eagerly.
-        final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3;
-        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 3);
         verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
 
         // Disable elections on the leader so it switches to follower.
@@ -1001,7 +970,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // At this point everything is committed and the follower datastore should see 5 transactions, but leader should
         // only see the initial transactions
-        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 3);
         verifyCarsReadWriteTransactions(followerDistributedDataStore, 5);
 
         DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
@@ -1009,7 +978,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
     }
 
-    private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
+    private static void verifyCarsReadWriteTransactions(final ClientBackedDataStore datastore, final int expected)
             throws Exception {
         IntegrationTestKit.verifyShardStats(datastore, "cars",
             stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
@@ -1025,8 +994,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
                 DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500),
                 commitTimeout);
-        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
-                testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
+        try (var follower2DistributedDataStore = follower2TestKit.setupDataStore(testParameter, testName,
+            MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
 
             followerTestKit.waitForMembersUp("member-3");
             follower2TestKit.waitForMembersUp("member-1", "member-2");
@@ -1039,21 +1008,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
 
-            final var usesCohorts = DistributedDataStore.class.isAssignableFrom(testParameter);
-            if (usesCohorts) {
-                IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-                    stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
-            }
+            // FIXME: this assertion should be made in an explicit Shard test
+            //            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+            //                stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
 
             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             writeTx.write(CarsModel.newCarPath("optima"), car);
             final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
 
-            if (usesCohorts) {
-                IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-                    stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
-            }
+            // FIXME: this assertion should be made in an explicit Shard test
+            //            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+            //                stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
 
             // Gracefully stop the leader via a Shutdown message.
 
@@ -1118,47 +1084,32 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
 
         final var noShardLeaderCohort = noShardLeaderWriteTx.ready();
-        final ListenableFuture<Boolean> canCommit;
-
-        // There is difference in behavior here:
-        if (!leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-            // ask-based canCommit() times out and aborts
-            final var ex = assertThrows(ExecutionException.class,
-                () -> leaderTestKit.doCommit(noShardLeaderCohort)).getCause();
-            assertThat(ex, instanceOf(NoShardLeaderException.class));
-            assertThat(ex.getMessage(), containsString(
-                "Shard member-1-shard-cars-testTransactionWithIsolatedLeader currently has no leader."));
-            canCommit = null;
-        } else {
-            // tell-based canCommit() does not have a real timeout and hence continues
-            canCommit = noShardLeaderCohort.canCommit();
-            Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
-            assertFalse(canCommit.isDone());
-        }
+        // tell-based canCommit() does not have a real timeout and hence continues
+        final var canCommit = noShardLeaderCohort.canCommit();
+        Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
+        assertFalse(canCommit.isDone());
 
         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
                 .shardElectionTimeoutFactor(100));
 
         final DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
 
-        followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
-                testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
+        followerDistributedDataStore = followerTestKit.setupDataStore(testParameter, testName,
+            MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
 
         leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
         leaderTestKit.doCommit(successTxCohort);
 
-        // continuation of tell-based protocol: readied transaction will complete commit, but will report an OLFE
-        if (canCommit != null) {
-            final var ex = assertThrows(ExecutionException.class,
-                () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
-            assertThat(ex, instanceOf(OptimisticLockFailedException.class));
-            assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
-            final var cause = ex.getCause();
-            assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
-            final var cmae = (ConflictingModificationAppliedException) cause;
-            assertEquals("Node was created by other transaction.", cmae.getMessage());
-            assertEquals(CarsModel.BASE_PATH, cmae.getPath());
-        }
+        // continuation of canCommit(): readied transaction will complete commit, but will report an OLFE
+        final var ex = assertThrows(ExecutionException.class,
+            () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
+        assertThat(ex, instanceOf(OptimisticLockFailedException.class));
+        assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
+        final var cause = ex.getCause();
+        assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
+        final var cmae = (ConflictingModificationAppliedException) cause;
+        assertEquals("Node was created by other transaction.", cmae.getMessage());
+        assertEquals(CarsModel.BASE_PATH, cmae.getPath());
     }
 
     @Test
@@ -1184,13 +1135,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
         final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready()));
-        final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause());
-        if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
-            assertTrue(msg, Throwables.getRootCause(ex) instanceof NoShardLeaderException
-                || ex.getCause() instanceof ShardLeaderNotRespondingException);
-        } else {
-            assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class));
-        }
+        assertThat("Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()),
+            Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class));
     }
 
     @Test
@@ -1219,12 +1165,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
         final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready()));
-        final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause());
-        if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
-            assertThat(msg, Throwables.getRootCause(ex), instanceOf(NoShardLeaderException.class));
-        } else {
-            assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class));
-        }
+        assertThat("Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()),
+            Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class));
     }
 
     @Test
@@ -1238,9 +1180,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
                 follower2System, follower2DatastoreContextBuilder, commitTimeout);
 
-        try (AbstractDataStore ds =
-                follower2TestKit.setupAbstractDataStore(
-                        testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
+        try (var ds = follower2TestKit.setupDataStore(testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
 
             followerTestKit.waitForMembersUp("member-1", "member-3");
             follower2TestKit.waitForMembersUp("member-1", "member-2");
@@ -1277,9 +1217,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
                 follower2System, follower2DatastoreContextBuilder, commitTimeout);
 
-        final AbstractDataStore ds2 =
-                     follower2TestKit.setupAbstractDataStore(
-                             testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
+        final var ds2 = follower2TestKit.setupDataStore(testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
 
         followerTestKit.waitForMembersUp("member-1", "member-3");
         follower2TestKit.waitForMembersUp("member-1", "member-2");
@@ -1368,9 +1306,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testReadWriteMessageSlicing() throws Exception {
-        // The slicing is only implemented for tell-based protocol
-        assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
-
         leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
         followerDatastoreContextBuilder.maximumMessageSliceSize(100);
         initDatastoresWithCars("testLargeReadReplySlicing");
@@ -1400,8 +1335,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
         leaderTestKit.doCommit(initialWriteTx.ready());
 
-        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
-                testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
+        try (var follower2DistributedDataStore = follower2TestKit.setupDataStore(testParameter, testName,
+            MODULE_SHARDS_CARS_1_2_3, false)) {
 
             final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
                     .getLocalShards().get("cars").getActor();
@@ -1464,7 +1399,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // FIXME: CONTROLLER-2020: ClientBackedDatastore does not have stable indexes/term,
         //                         the snapshot index seems to fluctuate
-        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
+        assumeTrue(false);
         IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
             state -> assertEquals(1, state.getSnapshotIndex()));
 
@@ -1523,7 +1458,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow());
     }
 
-    private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
+    private static void sendDatastoreContextUpdate(final ClientBackedDataStore dataStore, final Builder builder) {
         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
         final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
         final Answer<DatastoreContext> answer = invocation -> newBuilder.build();