Bump upstreams
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index e5f3c958354114fa0e2dade15b728b60882be941..265f4f2c454d0cdeaac6b6a124685ca9c2534689 100644 (file)
@@ -119,8 +119,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
@@ -251,16 +249,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
             throws Exception {
-        final Optional<NormalizedNode> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
-        assertTrue("isPresent", optional.isPresent());
-
-        final CollectionNodeBuilder<MapEntryNode, SystemMapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
-                CarsModel.CAR_QNAME);
-        for (final NormalizedNode entry: entries) {
-            listBuilder.withChild((MapEntryNode) entry);
-        }
-
-        assertEquals("Car list node", listBuilder.build(), optional.get());
+        assertEquals("Car list node",
+            Optional.of(ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME).withValue(Arrays.asList(entries)).build()),
+            readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS));
     }
 
     private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
@@ -784,7 +775,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
 
-        carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -803,7 +794,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
 
-        carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -833,7 +824,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
-        carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
         final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
 
         // Send a tx with immediate commit.
@@ -849,7 +840,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification),
             true, Optional.empty());
 
-        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -869,7 +860,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification),
             false, Optional.empty());
 
-        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
         if (resp instanceof akka.actor.Status.Failure) {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
@@ -898,10 +889,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
 
         // Verify backend statistics on start
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
-        IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
 
         // Do an initial write to get the primary shard info cached.
 
@@ -943,10 +932,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
 
         // At this point only leader should see the transactions
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 2, stats.getReadWriteTransactionCount()));
-        IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
 
         // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
         // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard
@@ -974,13 +961,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex);
         readWriteTx.write(carPath, cars.getLast());
 
-        // There is a difference here between implementations: tell-based protocol will postpone write operations until
-        // either a read is made or the transaction is submitted. Here we flush out the last transaction, so we see
-        // three transactions, not just the ones we have started committing
-        assertTrue(readWriteTx.exists(carPath).get(2, TimeUnit.SECONDS));
+        // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction
+        // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two
+        // transactions eagerly.
         final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3;
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount()));
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
 
         // Disable elections on the leader so it switches to follower.
 
@@ -1015,16 +1001,20 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // At this point everything is committed and the follower datastore should see 5 transactions, but leader should
         // only see the initial transactions
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount()));
-        IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 5);
 
         DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
         verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
         verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
     }
 
+    private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
+            throws Exception {
+        IntegrationTestKit.verifyShardStats(datastore, "cars",
+            stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
+    }
+
     @Test
     public void testLeadershipTransferOnShutdown() throws Exception {
         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
@@ -1297,7 +1287,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option
         TestKit.shutdownActorSystem(follower2System, true);
 
-        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
         final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
                 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
 
@@ -1312,7 +1302,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         await().atMost(10, TimeUnit.SECONDS)
                 .until(() -> containsUnreachable(followerCluster, follower2Member));
 
-        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
 
         // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
         // candidate, we can just send a couple of RequestVotes to both leader and follower.
@@ -1530,7 +1520,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
         MetadataShardDataTreeSnapshot shardSnapshot =
                 (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
-        assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
+        assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow());
     }
 
     private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {