Bump upstreams
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index bb2093bcc34f94420a5715b41a8c08784d54d4aa..265f4f2c454d0cdeaac6b6a124685ca9c2534689 100644 (file)
@@ -115,13 +115,12 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.common.Uint64;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
@@ -250,16 +249,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
             throws Exception {
-        final Optional<NormalizedNode> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
-        assertTrue("isPresent", optional.isPresent());
-
-        final CollectionNodeBuilder<MapEntryNode, SystemMapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
-                CarsModel.CAR_QNAME);
-        for (final NormalizedNode entry: entries) {
-            listBuilder.withChild((MapEntryNode) entry);
-        }
-
-        assertEquals("Car list node", listBuilder.build(), optional.get());
+        assertEquals("Car list node",
+            Optional.of(ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME).withValue(Arrays.asList(entries)).build()),
+            readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS));
     }
 
     private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
@@ -654,11 +646,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
 
-        final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
-                new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
-                    .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
-
-        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
+            .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+            .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+            .build());
 
         final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
             .getCause();
@@ -686,13 +677,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
 
-        final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
-                new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
-                    .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
-
         // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
         // done for put for performance reasons.
-        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
+            .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+            .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+            .build());
 
         final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
             .getCause();
@@ -785,7 +775,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
 
-        carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -804,7 +794,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
 
-        carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -834,7 +824,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
-        carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
         final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
 
         // Send a tx with immediate commit.
@@ -850,7 +840,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification),
             true, Optional.empty());
 
-        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -870,7 +860,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification),
             false, Optional.empty());
 
-        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -898,6 +888,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
 
+        // Verify backend statistics on start
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
+
         // Do an initial write to get the primary shard info cached.
 
         final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
@@ -937,10 +931,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
         final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
 
+        // At this point only leader should see the transactions
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
+
         // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
-        // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the
-        // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be
-        // sent on ready.
+        // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard
+        // (with shardBatchedModificationCount set to 2). The 3rd BatchedModifications will be sent on ready.
 
         final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
         for (int i = 1; i <= 5; i++, carIndex++) {
@@ -948,25 +945,28 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         }
 
-        // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications
-        // message on ready.
+        // Prepare another WO that writes to a single shard. This will send a single BatchedModifications message
+        // on ready.
 
         final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
         cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
         writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         carIndex++;
 
-        // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the
-        // leader shard on ready.
+        // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaction message to the leader shard
+        // on ready.
 
         final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
         cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
-        readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+        final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex);
+        readWriteTx.write(carPath, cars.getLast());
 
-        // FIXME: CONTROLLER-2017: ClientBackedDataStore reports only 4 transactions
-        assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
+        // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction
+        // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two
+        // transactions eagerly.
+        final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3;
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
 
         // Disable elections on the leader so it switches to follower.
 
@@ -999,11 +999,22 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         followerTestKit.doCommit(writeTx4Cohort);
         followerTestKit.doCommit(rwTxCohort);
 
+        // At this point everything is committed and the follower datastore should see 5 transactions, but leader should
+        // only see the initial transactions
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 5);
+
         DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
         verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
         verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
     }
 
+    private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
+            throws Exception {
+        IntegrationTestKit.verifyShardStats(datastore, "cars",
+            stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
+    }
+
     @Test
     public void testLeadershipTransferOnShutdown() throws Exception {
         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
@@ -1276,7 +1287,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option
         TestKit.shutdownActorSystem(follower2System, true);
 
-        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
         final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
                 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
 
@@ -1291,7 +1302,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         await().atMost(10, TimeUnit.SECONDS)
                 .until(() -> containsUnreachable(followerCluster, follower2Member));
 
-        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
 
         // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
         // candidate, we can just send a couple of RequestVotes to both leader and follower.
@@ -1444,8 +1455,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             followerDatastoreContextBuilder.snapshotOnRootOverwrite(true));
 
         leaderTestKit.waitForMembersUp("member-2");
-        final ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
-                .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
+        final ContainerNode rootNode = Builders.containerBuilder()
+                .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))
                 .withChild(CarsModel.create())
                 .build();
 
@@ -1509,7 +1520,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
         MetadataShardDataTreeSnapshot shardSnapshot =
                 (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
-        assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
+        assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow());
     }
 
     private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {