X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=265f4f2c454d0cdeaac6b6a124685ca9c2534689;hb=47c9d6a2bdf5897a8e6abbe2cb753dddedbc3b61;hp=323e9d6c5a8b59f7fc29094b01d51c6ee732d5dc;hpb=7bc006db12e2d24756192309515f3d0bc65442f1;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 323e9d6c5a..265f4f2c45 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -115,18 +115,17 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.common.Uint64; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode; -import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.collection.Set; import scala.concurrent.Await; @@ -250,16 +249,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) throws Exception { - final Optional optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - - final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( - CarsModel.CAR_QNAME); - for (final NormalizedNode entry: entries) { - listBuilder.withChild((MapEntryNode) entry); - } - - assertEquals("Car list node", listBuilder.build(), optional.get()); + assertEquals("Car list node", + Optional.of(ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME).withValue(Arrays.asList(entries)).build()), + readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS)); } private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, @@ -654,11 +646,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")) + .build()); final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) .getCause(); @@ -686,13 +677,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - // Note that merge will validate the data and fail but put succeeds b/c deep validation is not // done for put for performance reasons. - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")) + .build()); final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) .getCause(); @@ -761,7 +751,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } - @SuppressWarnings("unchecked") @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); @@ -786,7 +775,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty()); - carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -805,7 +794,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty()); - carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -826,7 +815,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } - @SuppressWarnings("unchecked") @Test public void testForwardedReadyTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); @@ -836,7 +824,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerDistributedDataStore.getActorUtils().findLocalShard("cars"); assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); - carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class); // Send a tx with immediate commit. @@ -852,7 +840,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification), true, Optional.empty()); - carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -872,7 +860,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification), false, Optional.empty()); - carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -900,6 +888,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); + // Verify backend statistics on start + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); + // Do an initial write to get the primary shard info cached. final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction(); @@ -939,10 +931,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); + // At this point only leader should see the transactions + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); + // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This - // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the - // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be - // sent on ready. + // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard + // (with shardBatchedModificationCount set to 2). The 3rd BatchedModifications will be sent on ready. final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction(); for (int i = 1; i <= 5; i++, carIndex++) { @@ -950,25 +945,28 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); } - // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications - // message on ready. + // Prepare another WO that writes to a single shard. This will send a single BatchedModifications message + // on ready. final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction(); cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; - // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the - // leader shard on ready. + // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaction message to the leader shard + // on ready. final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); - readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex); + readWriteTx.write(carPath, cars.getLast()); - // FIXME: CONTROLLER-2017: ClientBackedDataStore reports only 4 transactions - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); + // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction + // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two + // transactions eagerly. + final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3; + verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); // Disable elections on the leader so it switches to follower. @@ -1001,11 +999,22 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerTestKit.doCommit(writeTx4Cohort); followerTestKit.doCommit(rwTxCohort); + // At this point everything is committed and the follower datastore should see 5 transactions, but leader should + // only see the initial transactions + verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 5); + DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction(); verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()])); verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people); } + private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected) + throws Exception { + IntegrationTestKit.verifyShardStats(datastore, "cars", + stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount())); + } + @Test public void testLeadershipTransferOnShutdown() throws Exception { leaderDatastoreContextBuilder.shardBatchedModificationCount(1); @@ -1278,7 +1287,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option TestKit.shutdownActorSystem(follower2System, true); - ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow(); final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() .executeOperation(cars, GetOnDemandRaftState.INSTANCE); @@ -1293,7 +1302,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { await().atMost(10, TimeUnit.SECONDS) .until(() -> containsUnreachable(followerCluster, follower2Member)); - ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow(); // to simulate a follower not being able to receive messages, but still being able to send messages and becoming // candidate, we can just send a couple of RequestVotes to both leader and follower. @@ -1446,8 +1455,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); leaderTestKit.waitForMembersUp("member-2"); - final ContainerNode rootNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + final ContainerNode rootNode = Builders.containerBuilder() + .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME)) .withChild(CarsModel.create()) .build(); @@ -1511,7 +1520,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass()); MetadataShardDataTreeSnapshot shardSnapshot = (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot(); - assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get()); + assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow()); } private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {