X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=323e9d6c5a8b59f7fc29094b01d51c6ee732d5dc;hp=d5035b9577e4cca9784814dcb1a315aeaf41e7c2;hb=7bc006db12e2d24756192309515f3d0bc65442f1;hpb=4e097d8d56a7e3814c63a69da183765fd4c78a56 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index d5035b9577..323e9d6c5a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -8,10 +8,12 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -37,7 +39,6 @@ import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -84,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMet import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongBitmap; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; @@ -100,6 +102,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; @@ -117,6 +120,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode; import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -124,6 +128,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.collection.Set; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -139,7 +144,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 } + { TestDistributedDataStore.class, 7 }, { TestClientBackedDataStore.class, 12 } }); } @@ -370,10 +375,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); followerTestKit.doCommit(writeTx.ready()); - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); - - domStoreReadTransaction.close(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); + } } // wait to let the shard catch up with purged @@ -388,7 +392,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final var clientMeta = frontendMetadata.getClients().get(0); if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - assertTellClientMetadata(clientMeta, numCars); + assertTellClientMetadata(clientMeta, numCars * 2); } else { assertAskClientMetadata(clientMeta); } @@ -401,35 +405,31 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } - private void assertAskClientMetadata(final FrontendClientMetadata clientMeta) { + private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) { // ask based should track no metadata assertEquals(List.of(), clientMeta.getCurrentHistories()); } - private void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final int numCars) { + private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) { final var iterator = clientMeta.getCurrentHistories().iterator(); var metadata = iterator.next(); while (iterator.hasNext() && metadata.getHistoryId() != 1) { metadata = iterator.next(); } - assertEquals(0, metadata.getClosedTransactions().size()); - - final var purgedRanges = metadata.getPurgedTransactions().ranges(); - - // FIXME: CONTROLLER-1991: remove this assumption - assumeTrue(false); - - assertEquals(1, purgedRanges.size()); - final var purgedRange = purgedRanges.first(); - assertEquals(UnsignedLong.ZERO, purgedRange.lower()); - assertEquals(UnsignedLong.valueOf(10), purgedRange.upper()); + assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions()); + assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString()); } @Test public void testCloseTransactionMetadataLeak() throws Exception { - // FIXME: Ask-based frontend seems to have some issues with back to back close - assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); + // FIXME: CONTROLLER-2016: ask-based frontend triggers this: + // + // java.lang.IllegalStateException: Previous transaction + // member-2-datastore-testCloseTransactionMetadataLeak-fe-0-chn-1-txn-1-0 is not ready yet + // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy$Allocated.checkReady() + // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy.newReadOnlyTransaction() + assumeTrue(testParameter.isAssignableFrom(ClientBackedDataStore.class)); initDatastoresWithCars("testCloseTransactionMetadataLeak"); @@ -442,22 +442,17 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { int numCars = 5; for (int i = 0; i < numCars; i++) { - writeTx = txChain.newWriteOnlyTransaction(); - writeTx.close(); - - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + try (var tx = txChain.newWriteOnlyTransaction()) { + // Empty on purpose + } - domStoreReadTransaction.close(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); + } } - writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - followerTestKit.doCommit(writeTx.ready()); - // wait to let the shard catch up with purged - await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) + await("wait for purges to settle").atMost(5, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> { final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") @@ -468,17 +463,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final var clientMeta = frontendMetadata.getClients().get(0); if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - assertTellClientMetadata(clientMeta, numCars); + assertTellClientMetadata(clientMeta, numCars * 2); } else { assertAskClientMetadata(clientMeta); } }); - - try (var tx = txChain.newReadOnlyTransaction()) { - final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body(); - assertThat(body, instanceOf(Collection.class)); - assertEquals(numCars, ((Collection) body).size()); - } } @Test @@ -907,9 +896,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { - // FIXME: remove when test passes also for ClientBackedDataStore - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); @@ -979,6 +965,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + // FIXME: CONTROLLER-2017: ClientBackedDataStore reports only 4 transactions + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); @@ -1020,9 +1008,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testLeadershipTransferOnShutdown() throws Exception { - // FIXME: remove when test passes also for ClientBackedDataStore - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; @@ -1045,16 +1030,21 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); + final var usesCohorts = DistributedDataStore.class.isAssignableFrom(testParameter); + if (usesCohorts) { + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", + stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); + } writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); writeTx.write(CarsModel.newCarPath("optima"), car); final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize())); + if (usesCohorts) { + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", + stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize())); + } // Gracefully stop the leader via a Shutdown message. @@ -1086,9 +1076,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { - // FIXME: remove when test passes also for ClientBackedDataStore - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -1121,9 +1108,24 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState())); - final var ex = assertThrows(ExecutionException.class, - () -> leaderTestKit.doCommit(noShardLeaderWriteTx.ready())); - assertEquals(NoShardLeaderException.class, Throwables.getRootCause(ex).getClass()); + final var noShardLeaderCohort = noShardLeaderWriteTx.ready(); + final ListenableFuture canCommit; + + // There is difference in behavior here: + if (!leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + // ask-based canCommit() times out and aborts + final var ex = assertThrows(ExecutionException.class, + () -> leaderTestKit.doCommit(noShardLeaderCohort)).getCause(); + assertThat(ex, instanceOf(NoShardLeaderException.class)); + assertThat(ex.getMessage(), containsString( + "Shard member-1-shard-cars-testTransactionWithIsolatedLeader currently has no leader.")); + canCommit = null; + } else { + // tell-based canCommit() does not have a real timeout and hence continues + canCommit = noShardLeaderCohort.canCommit(); + Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS); + assertFalse(canCommit.isDone()); + } sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder .shardElectionTimeoutFactor(100)); @@ -1135,6 +1137,19 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderTestKit.doCommit(preIsolatedLeaderTxCohort); leaderTestKit.doCommit(successTxCohort); + + // continuation of tell-based protocol: readied transaction will complete commit, but will report an OLFE + if (canCommit != null) { + final var ex = assertThrows(ExecutionException.class, + () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause(); + assertThat(ex, instanceOf(OptimisticLockFailedException.class)); + assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage()); + final var cause = ex.getCause(); + assertThat(cause, instanceOf(ConflictingModificationAppliedException.class)); + final var cmae = (ConflictingModificationAppliedException) cause; + assertEquals("Node was created by other transaction.", cmae.getMessage()); + assertEquals(CarsModel.BASE_PATH, cmae.getPath()); + } } @Test @@ -1274,9 +1289,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { Member follower2Member = follower2Cluster.readView().self(); await().atMost(10, TimeUnit.SECONDS) - .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member)); + .until(() -> containsUnreachable(leaderCluster, follower2Member)); await().atMost(10, TimeUnit.SECONDS) - .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member)); + .until(() -> containsUnreachable(followerCluster, follower2Member)); ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); @@ -1298,6 +1313,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ds2.close(); } + private static Boolean containsUnreachable(final Cluster cluster, final Member member) { + // unreachableMembers() returns scala.collection.immutable.Set, but we are using scala.collection.Set to fix JDT + // see https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276#c32 + final Set members = cluster.readView().unreachableMembers(); + return members.contains(member); + } + @Test public void testInstallSnapshot() throws Exception { final String testName = "testInstallSnapshot"; @@ -1418,14 +1440,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testSnapshotOnRootOverwrite() throws Exception { - // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - - final String testName = "testSnapshotOnRootOverwrite"; - final String[] shards = {"cars", "default"}; - initDatastores(testName, "module-shards-default-cars-member1-and-2.conf", shards, - leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), - followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); + initDatastores("testSnapshotOnRootOverwrite", "module-shards-default-cars-member1-and-2.conf", + new String[] {"cars", "default"}, + leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), + followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); leaderTestKit.waitForMembersUp("member-2"); final ContainerNode rootNode = ImmutableContainerNodeBuilder.create() @@ -1435,6 +1453,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + // FIXME: CONTROLLER-2020: ClientBackedDatastore does not have stable indexes/term, + // the snapshot index seems to fluctuate + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", state -> assertEquals(1, state.getSnapshotIndex())); @@ -1472,7 +1493,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12); } - private void verifySnapshot(final String persistenceId, final long lastAppliedIndex) { + private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex) { await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); assertEquals(1, snap.size());