X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=542a79840d6b7fa753616dfb823e3844b2e2637d;hp=c01949c9306b8f31d2e28b77a649de5705042365;hb=8e4ea1b1109f2c4bf167cd2cca2a7ffe2e1f7b73;hpb=abaef4a5ae37f27542155457fe7306a4662b1eeb diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index c01949c930..542a79840d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -8,14 +8,19 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -32,8 +37,6 @@ import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; -import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -41,28 +44,22 @@ import com.typesafe.config.ConfigFactory; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import org.junit.After; -import org.junit.Assume; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -82,10 +79,11 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata; import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongBitmap; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; @@ -102,6 +100,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; @@ -260,15 +259,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, final NormalizedNode expNode) throws Exception { - final Optional optional = readTx.read(path).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", expNode, optional.get()); + assertEquals(Optional.of(expNode), readTx.read(path).get(5, TimeUnit.SECONDS)); } private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path) throws Exception { - final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS); - assertEquals("exists", Boolean.TRUE, exists); + assertEquals("exists", Boolean.TRUE, readTx.exists(path).get(5, TimeUnit.SECONDS)); } @Test @@ -359,8 +355,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testSingleTransactionsWritesInQuickSuccession() throws Exception { - final String testName = "testWriteTransactionWithSingleShard"; - initDatastoresWithCars(testName); + initDatastoresWithCars("testSingleTransactionsWritesInQuickSuccession"); final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -372,58 +367,64 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { int numCars = 5; for (int i = 0; i < numCars; i++) { writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); - + writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); followerTestKit.doCommit(writeTx.ready()); - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); - - domStoreReadTransaction.close(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); + } } // wait to let the shard catch up with purged await("Range set leak test").atMost(5, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> { - Optional localShard = - leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); - FrontendShardDataTreeSnapshotMetadata frontendMetadata = - (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() - .executeOperation(localShard.get(), new RequestFrontendMetadata()); + final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") + .orElseThrow(); + final var frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard, new RequestFrontendMetadata()); + final var clientMeta = frontendMetadata.getClients().get(0); if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - Iterator iterator = - frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); - FrontendHistoryMetadata metadata = iterator.next(); - while (iterator.hasNext() && metadata.getHistoryId() != 1) { - metadata = iterator.next(); - } - - assertEquals(0, metadata.getClosedTransactions().size()); - assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)), - metadata.getPurgedTransactions().asRanges().iterator().next()); + assertTellClientMetadata(clientMeta, numCars); } else { - // ask based should track no metadata - assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + assertAskClientMetadata(clientMeta); } }); - final Optional optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().body()).size()); + try (var tx = txChain.newReadOnlyTransaction()) { + final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body(); + assertThat(body, instanceOf(Collection.class)); + assertEquals(numCars, ((Collection) body).size()); + } + } + + private void assertAskClientMetadata(final FrontendClientMetadata clientMeta) { + // ask based should track no metadata + assertEquals(List.of(), clientMeta.getCurrentHistories()); + } + + private void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final int numCars) { + final var iterator = clientMeta.getCurrentHistories().iterator(); + var metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + // FIXME: CONTROLLER-1991: remove this assumption + assumeTrue(false); + + assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions()); + assertEquals("[[0.." + numCars * 2 + "]]", metadata.getPurgedTransactions().toString()); } @Test - @Ignore("Flushes out tell based leak needs to be handled separately") public void testCloseTransactionMetadataLeak() throws Exception { - // Ask based frontend seems to have some issues with back to back close - Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); + // FIXME: Ask-based frontend seems to have some issues with back to back close + assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); - final String testName = "testWriteTransactionWithSingleShard"; - initDatastoresWithCars(testName); + initDatastoresWithCars("testCloseTransactionMetadataLeak"); final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -437,10 +438,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx = txChain.newWriteOnlyTransaction(); writeTx.close(); - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); - - domStoreReadTransaction.close(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); + } } writeTx = txChain.newWriteOnlyTransaction(); @@ -452,34 +452,25 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> { - Optional localShard = - leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); - FrontendShardDataTreeSnapshotMetadata frontendMetadata = + final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") + .orElseThrow(); + final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() - .executeOperation(localShard.get(), new RequestFrontendMetadata()); + .executeOperation(localShard, new RequestFrontendMetadata()); + final var clientMeta = frontendMetadata.getClients().get(0); if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - Iterator iterator = - frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); - FrontendHistoryMetadata metadata = iterator.next(); - while (iterator.hasNext() && metadata.getHistoryId() != 1) { - metadata = iterator.next(); - } - - Set> ranges = metadata.getPurgedTransactions().asRanges(); - - assertEquals(0, metadata.getClosedTransactions().size()); - assertEquals(1, ranges.size()); + assertTellClientMetadata(clientMeta, numCars); } else { - // ask based should track no metadata - assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + assertAskClientMetadata(clientMeta); } }); - final Optional optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().body()).size()); + try (var tx = txChain.newReadOnlyTransaction()) { + final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body(); + assertThat(body, instanceOf(Collection.class)); + assertEquals(numCars, ((Collection) body).size()); + } } @Test @@ -630,13 +621,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.merge(personPath, person); - Optional optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); + assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS)); + assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS)); final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); @@ -654,8 +640,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); verifyCars(readTx, car); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertFalse("isPresent", optional.isPresent()); + assertEquals(Optional.empty(), readTx.read(personPath).get(5, TimeUnit.SECONDS)); } @Test @@ -667,7 +652,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); + final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -678,12 +663,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) + .getCause(); + assertThat(ex, instanceOf(TransactionCommitFailedException.class)); verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); @@ -700,7 +682,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); + final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -715,12 +697,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // done for put for performance reasons. writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) + .getCause(); + assertThat(ex, instanceOf(TransactionCommitFailedException.class)); verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); @@ -840,11 +819,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); - final Supplier versionSupplier = Mockito.mock(Supplier.class); - Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); - ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorUtils(), Arrays.asList( - new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); + ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(leaderDistributedDataStore.getActorUtils(), + List.of(new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), + () -> DataStoreVersions.CURRENT_VERSION)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -874,10 +851,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); - ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, - DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true, - Optional.empty()); + ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, + new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification), + true, Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -895,10 +871,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); - forwardedReady = new ForwardedReadyTransaction(tx2, - DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false, - Optional.empty()); + forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, + new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification), + false, Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -911,11 +886,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); - final Supplier versionSupplier = Mockito.mock(Supplier.class); - Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorUtils(), Arrays.asList( - new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); + leaderDistributedDataStore.getActorUtils(), List.of( + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), + () -> DataStoreVersions.CURRENT_VERSION)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -926,7 +900,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { // FIXME: remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); @@ -1038,7 +1013,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testLeadershipTransferOnShutdown() throws Exception { // FIXME: remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; @@ -1103,7 +1079,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { // FIXME: remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -1136,12 +1113,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState())); - try { - leaderTestKit.doCommit(noShardLeaderWriteTx.ready()); - fail("Expected NoShardLeaderException"); - } catch (final ExecutionException e) { - assertEquals("getCause", NoShardLeaderException.class, Throwables.getRootCause(e).getClass()); - } + final var ex = assertThrows(ExecutionException.class, + () -> leaderTestKit.doCommit(noShardLeaderWriteTx.ready())); + assertEquals(NoShardLeaderException.class, Throwables.getRootCause(ex).getClass()); sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder .shardElectionTimeoutFactor(100)); @@ -1177,17 +1151,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - try { - followerTestKit.doCommit(rwTx.ready()); - fail("Exception expected"); - } catch (final ExecutionException e) { - final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.isAssignableFrom(testParameter)) { - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException - || e.getCause() instanceof ShardLeaderNotRespondingException); - } else { - assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); - } + final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready())); + final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()); + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { + assertTrue(msg, Throwables.getRootCause(ex) instanceof NoShardLeaderException + || ex.getCause() instanceof ShardLeaderNotRespondingException); + } else { + assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); } } @@ -1216,16 +1186,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - try { - followerTestKit.doCommit(rwTx.ready()); - fail("Exception expected"); - } catch (final ExecutionException e) { - final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.isAssignableFrom(testParameter)) { - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); - } else { - assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); - } + final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready())); + final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()); + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { + assertThat(msg, Throwables.getRootCause(ex), instanceOf(NoShardLeaderException.class)); + } else { + assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); } } @@ -1351,10 +1317,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars(testName); - final Optional readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( - CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", readOptional.isPresent()); - assertEquals("Node", carsNode, readOptional.get()); + assertEquals(Optional.of(carsNode), leaderDistributedDataStore.newReadOnlyTransaction().read( + CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS)); verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class), initialSnapshot, snapshotRoot); @@ -1366,7 +1330,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadWriteMessageSlicing() throws Exception { // The slicing is only implemented for tell-based protocol - Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); + assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.maximumMessageSliceSize(100); followerDatastoreContextBuilder.maximumMessageSliceSize(100); @@ -1446,10 +1410,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testSnapshotOnRootOverwrite() throws Exception { - if (!DistributedDataStore.class.isAssignableFrom(testParameter)) { - // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate - return; - } + // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); final String testName = "testSnapshotOnRootOverwrite"; final String[] shards = {"cars", "default"}; @@ -1525,10 +1487,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) { final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build()); - final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); + final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); final Answer answer = invocation -> newBuilder.build(); - Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext(); - Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString()); + doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext(); + doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(anyString()); dataStore.onDatastoreContextUpdated(mockContextFactory); } }