Bump upstreams
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index 60d03a7e69d0dedea952f5b5bbe577399617cb18..265f4f2c454d0cdeaac6b6a124685ca9c2534689 100644 (file)
@@ -8,10 +8,12 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -100,6 +102,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
@@ -112,17 +115,17 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.common.Uint64;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.collection.Set;
 import scala.concurrent.Await;
@@ -140,7 +143,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
+                { TestDistributedDataStore.class, 7 }, { TestClientBackedDataStore.class, 12 }
         });
     }
 
@@ -246,16 +249,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
             throws Exception {
-        final Optional<NormalizedNode> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
-        assertTrue("isPresent", optional.isPresent());
-
-        final CollectionNodeBuilder<MapEntryNode, SystemMapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
-                CarsModel.CAR_QNAME);
-        for (final NormalizedNode entry: entries) {
-            listBuilder.withChild((MapEntryNode) entry);
-        }
-
-        assertEquals("Car list node", listBuilder.build(), optional.get());
+        assertEquals("Car list node",
+            Optional.of(ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME).withValue(Arrays.asList(entries)).build()),
+            readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS));
     }
 
     private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
@@ -401,21 +397,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
-    private void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
+    private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
         // ask based should track no metadata
         assertEquals(List.of(), clientMeta.getCurrentHistories());
     }
 
-    private void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
+    private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
         final var iterator = clientMeta.getCurrentHistories().iterator();
         var metadata = iterator.next();
         while (iterator.hasNext() && metadata.getHistoryId() != 1) {
             metadata = iterator.next();
         }
 
-        // FIXME: CONTROLLER-1991: remove this assumption
-        assumeTrue(false);
-
         assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions());
         assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString());
     }
@@ -441,21 +434,17 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         int numCars = 5;
         for (int i = 0; i < numCars; i++) {
-            writeTx = txChain.newWriteOnlyTransaction();
-            writeTx.close();
+            try (var tx = txChain.newWriteOnlyTransaction()) {
+                // Empty on purpose
+            }
 
             try (var tx = txChain.newReadOnlyTransaction()) {
                 tx.read(CarsModel.BASE_PATH).get();
             }
         }
 
-        writeTx = txChain.newWriteOnlyTransaction();
-        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-        followerTestKit.doCommit(writeTx.ready());
-
         // wait to let the shard catch up with purged
-        await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+        await("wait for purges to settle").atMost(5, TimeUnit.SECONDS)
                 .pollInterval(500, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> {
                     final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
@@ -466,17 +455,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
                     final var clientMeta = frontendMetadata.getClients().get(0);
                     if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                        assertTellClientMetadata(clientMeta, numCars * 2 + 1);
+                        assertTellClientMetadata(clientMeta, numCars * 2);
                     } else {
                         assertAskClientMetadata(clientMeta);
                     }
                 });
-
-        try (var tx = txChain.newReadOnlyTransaction()) {
-            final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body();
-            assertThat(body, instanceOf(Collection.class));
-            assertEquals(numCars, ((Collection<?>) body).size());
-        }
     }
 
     @Test
@@ -663,11 +646,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
 
-        final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
-                new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
-                    .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
-
-        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
+            .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+            .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+            .build());
 
         final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
             .getCause();
@@ -695,13 +677,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
 
-        final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
-                new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
-                    .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
-
         // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
         // done for put for performance reasons.
-        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
+            .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+            .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+            .build());
 
         final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
             .getCause();
@@ -770,7 +751,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
@@ -795,7 +775,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
 
-        carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -814,7 +794,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
 
-        carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -835,7 +815,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
@@ -845,7 +824,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
-        carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
         final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
 
         // Send a tx with immediate commit.
@@ -861,7 +840,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification),
             true, Optional.empty());
 
-        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -881,7 +860,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification),
             false, Optional.empty());
 
-        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -909,6 +888,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
 
+        // Verify backend statistics on start
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
+
         // Do an initial write to get the primary shard info cached.
 
         final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
@@ -948,10 +931,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
         final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
 
+        // At this point only leader should see the transactions
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
+
         // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
-        // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the
-        // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be
-        // sent on ready.
+        // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard
+        // (with shardBatchedModificationCount set to 2). The 3rd BatchedModifications will be sent on ready.
 
         final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
         for (int i = 1; i <= 5; i++, carIndex++) {
@@ -959,25 +945,28 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         }
 
-        // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications
-        // message on ready.
+        // Prepare another WO that writes to a single shard. This will send a single BatchedModifications message
+        // on ready.
 
         final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
         cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
         writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         carIndex++;
 
-        // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the
-        // leader shard on ready.
+        // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaction message to the leader shard
+        // on ready.
 
         final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
         cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
-        readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+        final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex);
+        readWriteTx.write(carPath, cars.getLast());
 
-        // FIXME: CONTROLLER-2017: ClientBackedDataStore reports only 4 transactions
-        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
+        // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction
+        // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two
+        // transactions eagerly.
+        final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3;
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
 
         // Disable elections on the leader so it switches to follower.
 
@@ -1010,16 +999,24 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         followerTestKit.doCommit(writeTx4Cohort);
         followerTestKit.doCommit(rwTxCohort);
 
+        // At this point everything is committed and the follower datastore should see 5 transactions, but leader should
+        // only see the initial transactions
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 5);
+
         DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
         verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
         verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
     }
 
+    private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
+            throws Exception {
+        IntegrationTestKit.verifyShardStats(datastore, "cars",
+            stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
+    }
+
     @Test
     public void testLeadershipTransferOnShutdown() throws Exception {
-        // FIXME: remove when test passes also for ClientBackedDataStore
-        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
-
         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
         final String testName = "testLeadershipTransferOnShutdown";
@@ -1042,16 +1039,21 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
 
-            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-                stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
+            final var usesCohorts = DistributedDataStore.class.isAssignableFrom(testParameter);
+            if (usesCohorts) {
+                IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+                    stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
+            }
 
             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             writeTx.write(CarsModel.newCarPath("optima"), car);
             final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
 
-            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-                stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
+            if (usesCohorts) {
+                IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+                    stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
+            }
 
             // Gracefully stop the leader via a Shutdown message.
 
@@ -1083,9 +1085,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionWithIsolatedLeader() throws Exception {
-        // FIXME: CONTROLLER-2018: remove when test passes also for ClientBackedDataStore
-        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
-
         // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
         final String testName = "testTransactionWithIsolatedLeader";
@@ -1118,9 +1117,24 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
             raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
 
-        final var ex = assertThrows(ExecutionException.class,
-            () -> leaderTestKit.doCommit(noShardLeaderWriteTx.ready()));
-        assertEquals(NoShardLeaderException.class, Throwables.getRootCause(ex).getClass());
+        final var noShardLeaderCohort = noShardLeaderWriteTx.ready();
+        final ListenableFuture<Boolean> canCommit;
+
+        // There is difference in behavior here:
+        if (!leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+            // ask-based canCommit() times out and aborts
+            final var ex = assertThrows(ExecutionException.class,
+                () -> leaderTestKit.doCommit(noShardLeaderCohort)).getCause();
+            assertThat(ex, instanceOf(NoShardLeaderException.class));
+            assertThat(ex.getMessage(), containsString(
+                "Shard member-1-shard-cars-testTransactionWithIsolatedLeader currently has no leader."));
+            canCommit = null;
+        } else {
+            // tell-based canCommit() does not have a real timeout and hence continues
+            canCommit = noShardLeaderCohort.canCommit();
+            Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
+            assertFalse(canCommit.isDone());
+        }
 
         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
                 .shardElectionTimeoutFactor(100));
@@ -1132,6 +1146,19 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
         leaderTestKit.doCommit(successTxCohort);
+
+        // continuation of tell-based protocol: readied transaction will complete commit, but will report an OLFE
+        if (canCommit != null) {
+            final var ex = assertThrows(ExecutionException.class,
+                () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
+            assertThat(ex, instanceOf(OptimisticLockFailedException.class));
+            assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
+            final var cause = ex.getCause();
+            assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
+            final var cmae = (ConflictingModificationAppliedException) cause;
+            assertEquals("Node was created by other transaction.", cmae.getMessage());
+            assertEquals(CarsModel.BASE_PATH, cmae.getPath());
+        }
     }
 
     @Test
@@ -1260,7 +1287,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option
         TestKit.shutdownActorSystem(follower2System, true);
 
-        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
         final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
                 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
 
@@ -1275,7 +1302,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         await().atMost(10, TimeUnit.SECONDS)
                 .until(() -> containsUnreachable(followerCluster, follower2Member));
 
-        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
 
         // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
         // candidate, we can just send a couple of RequestVotes to both leader and follower.
@@ -1422,23 +1449,22 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testSnapshotOnRootOverwrite() throws Exception {
-        // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
-        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
-
-        final String testName = "testSnapshotOnRootOverwrite";
-        final String[] shards = {"cars", "default"};
-        initDatastores(testName, "module-shards-default-cars-member1-and-2.conf", shards,
-                leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true),
-                followerDatastoreContextBuilder.snapshotOnRootOverwrite(true));
+        initDatastores("testSnapshotOnRootOverwrite", "module-shards-default-cars-member1-and-2.conf",
+            new String[] {"cars", "default"},
+            leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true),
+            followerDatastoreContextBuilder.snapshotOnRootOverwrite(true));
 
         leaderTestKit.waitForMembersUp("member-2");
-        final ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
-                .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
+        final ContainerNode rootNode = Builders.containerBuilder()
+                .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))
                 .withChild(CarsModel.create())
                 .build();
 
         leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode);
 
+        // FIXME: CONTROLLER-2020: ClientBackedDatastore does not have stable indexes/term,
+        //                         the snapshot index seems to fluctuate
+        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
             state -> assertEquals(1, state.getSnapshotIndex()));
 
@@ -1476,7 +1502,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12);
     }
 
-    private void verifySnapshot(final String persistenceId, final long lastAppliedIndex) {
+    private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex) {
         await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
                 List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
                 assertEquals(1, snap.size());
@@ -1494,7 +1520,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
         MetadataShardDataTreeSnapshot shardSnapshot =
                 (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
-        assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
+        assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow());
     }
 
     private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {