X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=32718440c1d4b0ed9eeda84ddea7174a31ba8874;hb=f40e99a51cd3fc2c9be3ac8aa0772bdb6b6ce479;hp=4795cb2ae46b89a7328ccf0e755f56e30ad2e262;hpb=1c7ddb7ffc8a78636c731ca589945636f28007a2;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 4795cb2ae4..32718440c1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -119,8 +119,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode; -import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException; @@ -251,16 +249,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) throws Exception { - final Optional optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - - final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( - CarsModel.CAR_QNAME); - for (final NormalizedNode entry: entries) { - listBuilder.withChild((MapEntryNode) entry); - } - - assertEquals("Car list node", listBuilder.build(), optional.get()); + assertEquals("Car list node", + Optional.of(ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME).withValue(Arrays.asList(entries)).build()), + readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS)); } private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, @@ -784,7 +775,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty()); - carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -803,7 +794,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty()); - carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -833,7 +824,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerDistributedDataStore.getActorUtils().findLocalShard("cars"); assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); - carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class); // Send a tx with immediate commit. @@ -849,7 +840,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification), true, Optional.empty()); - carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -869,7 +860,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification), false, Optional.empty()); - carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -897,6 +888,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); + // Verify backend statistics on start + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); + // Do an initial write to get the primary shard info cached. final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction(); @@ -936,6 +931,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); + // At this point only leader should see the transactions + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); + // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard // (with shardBatchedModificationCount set to 2). The 3rd BatchedModifications will be sent on ready. @@ -959,12 +958,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); - readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex); + readWriteTx.write(carPath, cars.getLast()); - // FIXME: CONTROLLER-2017: ClientBackedDataStore reports only 4 transactions - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); + // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction + // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two + // transactions eagerly. + final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3; + verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); // Disable elections on the leader so it switches to follower. @@ -997,11 +999,22 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerTestKit.doCommit(writeTx4Cohort); followerTestKit.doCommit(rwTxCohort); + // At this point everything is committed and the follower datastore should see 5 transactions, but leader should + // only see the initial transactions + verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 5); + DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction(); verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()])); verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people); } + private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected) + throws Exception { + IntegrationTestKit.verifyShardStats(datastore, "cars", + stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount())); + } + @Test public void testLeadershipTransferOnShutdown() throws Exception { leaderDatastoreContextBuilder.shardBatchedModificationCount(1); @@ -1274,7 +1287,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option TestKit.shutdownActorSystem(follower2System, true); - ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow(); final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() .executeOperation(cars, GetOnDemandRaftState.INSTANCE); @@ -1289,7 +1302,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { await().atMost(10, TimeUnit.SECONDS) .until(() -> containsUnreachable(followerCluster, follower2Member)); - ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow(); // to simulate a follower not being able to receive messages, but still being able to send messages and becoming // candidate, we can just send a couple of RequestVotes to both leader and follower. @@ -1332,7 +1345,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)))); AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode); - final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); + final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.of()); final Snapshot initialSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)), Collections.emptyList(), 5, 1, 5, 1, 1, null, null); @@ -1447,7 +1460,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .withChild(CarsModel.create()) .build(); - leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.of(), rootNode); // FIXME: CONTROLLER-2020: ClientBackedDatastore does not have stable indexes/term, // the snapshot index seems to fluctuate @@ -1477,7 +1490,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); // root overwrite so expect a snapshot - leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.of(), rootNode); // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1) IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", @@ -1507,7 +1520,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass()); MetadataShardDataTreeSnapshot shardSnapshot = (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot(); - assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get()); + assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow()); } private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {