X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=323e9d6c5a8b59f7fc29094b01d51c6ee732d5dc;hb=HEAD;hp=60d03a7e69d0dedea952f5b5bbe577399617cb18;hpb=f37b08069ae01215fb72e8d85db045c34c0ba6c2;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 60d03a7e69..91c00f7eb1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -12,13 +12,14 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -37,6 +38,7 @@ import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -46,6 +48,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -70,8 +73,6 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages; import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages; -import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; -import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; @@ -100,10 +101,10 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; -import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; import org.opendaylight.mdsal.dom.spi.store.DOMStore; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; @@ -112,17 +113,17 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.common.Uint64; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode; -import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.collection.Set; import scala.concurrent.Await; @@ -140,12 +141,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 } + { TestClientBackedDataStore.class, 12 } }); } @Parameter(0) - public Class testParameter; + public Class testParameter; @Parameter(1) public int commitTimeout; @@ -175,8 +176,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private final TransactionIdentifier tx1 = nextTransactionId(); private final TransactionIdentifier tx2 = nextTransactionId(); - private AbstractDataStore followerDistributedDataStore; - private AbstractDataStore leaderDistributedDataStore; + private ClientBackedDataStore followerDistributedDataStore; + private ClientBackedDataStore leaderDistributedDataStore; private IntegrationTestKit followerTestKit; private IntegrationTestKit leaderTestKit; @@ -198,7 +199,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @After public void tearDown() { if (followerDistributedDataStore != null) { - leaderDistributedDataStore.close(); + followerDistributedDataStore.close(); } if (leaderDistributedDataStore != null) { leaderDistributedDataStore.close(); @@ -231,11 +232,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { throws Exception { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout); - leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore( - testParameter, type, moduleShardsConfig, false, shards); + leaderDistributedDataStore = leaderTestKit.setupDataStore(testParameter, type, moduleShardsConfig, false, + shards); followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout); - followerDistributedDataStore = followerTestKit.setupAbstractDataStore( + followerDistributedDataStore = followerTestKit.setupDataStore( testParameter, type, moduleShardsConfig, false, shards); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards); @@ -246,16 +247,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) throws Exception { - final Optional optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - - final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( - CarsModel.CAR_QNAME); - for (final NormalizedNode entry: entries) { - listBuilder.withChild((MapEntryNode) entry); - } - - assertEquals("Car list node", listBuilder.build(), optional.get()); + assertEquals("Car list node", + Optional.of(ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME).withValue(Arrays.asList(entries)).build()), + readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS)); } private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, @@ -347,9 +341,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2"); - try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder, - commitTimeout) - .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) { + try (var member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder, commitTimeout) + .setupDataStore(testParameter, testName, "module-shards-member2", true, CARS)) { verifyCars(member2Datastore.newReadOnlyTransaction(), car2); } } @@ -378,105 +371,67 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // wait to let the shard catch up with purged await("Range set leak test").atMost(5, TimeUnit.SECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .untilAsserted(() -> { - final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") - .orElseThrow(); - final var frontendMetadata = - (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() - .executeOperation(localShard, new RequestFrontendMetadata()); - - final var clientMeta = frontendMetadata.getClients().get(0); - if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - assertTellClientMetadata(clientMeta, numCars * 2); - } else { - assertAskClientMetadata(clientMeta); - } - }); + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow(); + final var frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard, new RequestFrontendMetadata()); + + assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2); + }); try (var tx = txChain.newReadOnlyTransaction()) { - final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body(); - assertThat(body, instanceOf(Collection.class)); + final var body = assertInstanceOf(Collection.class, + tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body()); assertEquals(numCars, ((Collection) body).size()); } } - private void assertAskClientMetadata(final FrontendClientMetadata clientMeta) { - // ask based should track no metadata - assertEquals(List.of(), clientMeta.getCurrentHistories()); - } - - private void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) { + private static void assertClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) { final var iterator = clientMeta.getCurrentHistories().iterator(); var metadata = iterator.next(); while (iterator.hasNext() && metadata.getHistoryId() != 1) { metadata = iterator.next(); } - // FIXME: CONTROLLER-1991: remove this assumption - assumeTrue(false); - assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions()); assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString()); } @Test public void testCloseTransactionMetadataLeak() throws Exception { - // FIXME: CONTROLLER-2016: ask-based frontend triggers this: - // - // java.lang.IllegalStateException: Previous transaction - // member-2-datastore-testCloseTransactionMetadataLeak-fe-0-chn-1-txn-1-0 is not ready yet - // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy$Allocated.checkReady() - // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy.newReadOnlyTransaction() - assumeTrue(testParameter.isAssignableFrom(ClientBackedDataStore.class)); - initDatastoresWithCars("testCloseTransactionMetadataLeak"); - final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + final var txChain = followerDistributedDataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + var writeTx = txChain.newWriteOnlyTransaction(); writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); followerTestKit.doCommit(writeTx.ready()); int numCars = 5; for (int i = 0; i < numCars; i++) { - writeTx = txChain.newWriteOnlyTransaction(); - writeTx.close(); + try (var tx = txChain.newWriteOnlyTransaction()) { + // Empty on purpose + } try (var tx = txChain.newReadOnlyTransaction()) { tx.read(CarsModel.BASE_PATH).get(); } } - writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - followerTestKit.doCommit(writeTx.ready()); - // wait to let the shard catch up with purged - await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .untilAsserted(() -> { - final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") - .orElseThrow(); - final var frontendMetadata = - (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() - .executeOperation(localShard, new RequestFrontendMetadata()); - - final var clientMeta = frontendMetadata.getClients().get(0); - if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - assertTellClientMetadata(clientMeta, numCars * 2 + 1); - } else { - assertAskClientMetadata(clientMeta); - } - }); - - try (var tx = txChain.newReadOnlyTransaction()) { - final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body(); - assertThat(body, instanceOf(Collection.class)); - assertEquals(numCars, ((Collection) body).size()); - } + await("wait for purges to settle").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow(); + final var frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard, new RequestFrontendMetadata()); + + assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2); + }); } @Test @@ -658,22 +613,21 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class); - final DOMTransactionChain txChain = broker.createTransactionChain(listener); + final var listener = mock(FutureCallback.class); + final DOMTransactionChain txChain = broker.createTransactionChain(); + txChain.addCallback(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")) + .build()); - final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) - .getCause(); - assertThat(ex, instanceOf(TransactionCommitFailedException.class)); + final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)); + assertInstanceOf(TransactionCommitFailedException.class, ex.getCause()); - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); + verify(listener, timeout(5000)).onFailure(any()); txChain.close(); broker.close(); @@ -683,34 +637,32 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { public void testChainedTransactionFailureWithMultipleShards() throws Exception { initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards"); - final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder().put( - LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), - MoreExecutors.directExecutor()); + try (var broker = new ConcurrentDOMDataBroker( + Map.of(LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore), MoreExecutors.directExecutor())) { - final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class); - final DOMTransactionChain txChain = broker.createTransactionChain(listener); + final var listener = mock(FutureCallback.class); + final DOMTransactionChain txChain = broker.createTransactionChain(); + txChain.addCallback(listener); - final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - - writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - // Note that merge will validate the data and fail but put succeeds b/c deep validation is not - // done for put for performance reasons. - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + // Note that merge will validate the data and fail but put succeeds b/c deep validation is not + // done for put for performance reasons. + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")) + .build()); - final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) - .getCause(); - assertThat(ex, instanceOf(TransactionCommitFailedException.class)); + final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) + .getCause(); + assertThat(ex, instanceOf(TransactionCommitFailedException.class)); - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); + verify(listener, timeout(5000)).onFailure(any()); - txChain.close(); - broker.close(); + txChain.close(); + } } @Test @@ -750,9 +702,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout); - try (AbstractDataStore ds = - newMember1TestKit.setupAbstractDataStore( - testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { + try (var ds = newMember1TestKit.setupDataStore(testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, + CARS)) { followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS); @@ -770,7 +721,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } - @SuppressWarnings("unchecked") @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); @@ -795,7 +745,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty()); - carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -814,7 +764,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty()); - carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -835,7 +785,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } - @SuppressWarnings("unchecked") @Test public void testForwardedReadyTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); @@ -845,7 +794,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerDistributedDataStore.getActorUtils().findLocalShard("cars"); assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); - carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class); // Send a tx with immediate commit. @@ -861,7 +810,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification), true, Optional.empty()); - carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -881,7 +830,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification), false, Optional.empty()); - carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); if (resp instanceof akka.actor.Status.Failure) { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); @@ -909,6 +858,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); + // Verify backend statistics on start + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); + // Do an initial write to get the primary shard info cached. final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction(); @@ -948,10 +901,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); + // At this point only leader should see the transactions + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); + // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This - // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the - // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be - // sent on ready. + // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard + // (with shardBatchedModificationCount set to 2). The 3rd BatchedModifications will be sent on ready. final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction(); for (int i = 1; i <= 5; i++, carIndex++) { @@ -959,25 +915,27 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); } - // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications - // message on ready. + // Prepare another WO that writes to a single shard. This will send a single BatchedModifications message + // on ready. final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction(); cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; - // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the - // leader shard on ready. + // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaction message to the leader shard + // on ready. final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); - readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex); + readWriteTx.write(carPath, cars.getLast()); - // FIXME: CONTROLLER-2017: ClientBackedDataStore reports only 4 transactions - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); + // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction + // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two + // transactions eagerly. + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 3); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); // Disable elections on the leader so it switches to follower. @@ -1010,16 +968,24 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerTestKit.doCommit(writeTx4Cohort); followerTestKit.doCommit(rwTxCohort); + // At this point everything is committed and the follower datastore should see 5 transactions, but leader should + // only see the initial transactions + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 3); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 5); + DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction(); verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()])); verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people); } + private static void verifyCarsReadWriteTransactions(final ClientBackedDataStore datastore, final int expected) + throws Exception { + IntegrationTestKit.verifyShardStats(datastore, "cars", + stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount())); + } + @Test public void testLeadershipTransferOnShutdown() throws Exception { - // FIXME: remove when test passes also for ClientBackedDataStore - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; @@ -1028,8 +994,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500), commitTimeout); - try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( - testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { + try (var follower2DistributedDataStore = follower2TestKit.setupDataStore(testParameter, testName, + MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { followerTestKit.waitForMembersUp("member-3"); follower2TestKit.waitForMembersUp("member-1", "member-2"); @@ -1042,16 +1008,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); + // FIXME: this assertion should be made in an explicit Shard test + // IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", + // stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); writeTx.write(CarsModel.newCarPath("optima"), car); final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize())); + // FIXME: this assertion should be made in an explicit Shard test + // IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", + // stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize())); // Gracefully stop the leader via a Shutdown message. @@ -1083,9 +1051,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { - // FIXME: CONTROLLER-2018: remove when test passes also for ClientBackedDataStore - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -1118,20 +1083,33 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState())); - final var ex = assertThrows(ExecutionException.class, - () -> leaderTestKit.doCommit(noShardLeaderWriteTx.ready())); - assertEquals(NoShardLeaderException.class, Throwables.getRootCause(ex).getClass()); + final var noShardLeaderCohort = noShardLeaderWriteTx.ready(); + // tell-based canCommit() does not have a real timeout and hence continues + final var canCommit = noShardLeaderCohort.canCommit(); + Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS); + assertFalse(canCommit.isDone()); sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder .shardElectionTimeoutFactor(100)); final DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready(); - followerDistributedDataStore = followerTestKit.setupAbstractDataStore( - testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS); + followerDistributedDataStore = followerTestKit.setupDataStore(testParameter, testName, + MODULE_SHARDS_CARS_ONLY_1_2, false, CARS); leaderTestKit.doCommit(preIsolatedLeaderTxCohort); leaderTestKit.doCommit(successTxCohort); + + // continuation of canCommit(): readied transaction will complete commit, but will report an OLFE + final var ex = assertThrows(ExecutionException.class, + () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause(); + assertThat(ex, instanceOf(OptimisticLockFailedException.class)); + assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage()); + final var cause = ex.getCause(); + assertThat(cause, instanceOf(ConflictingModificationAppliedException.class)); + final var cmae = (ConflictingModificationAppliedException) cause; + assertEquals("Node was created by other transaction.", cmae.getMessage()); + assertEquals(CarsModel.BASE_PATH, cmae.getPath()); } @Test @@ -1157,13 +1135,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready())); - final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()); - if (DistributedDataStore.class.isAssignableFrom(testParameter)) { - assertTrue(msg, Throwables.getRootCause(ex) instanceof NoShardLeaderException - || ex.getCause() instanceof ShardLeaderNotRespondingException); - } else { - assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); - } + assertThat("Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()), + Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); } @Test @@ -1192,12 +1165,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready())); - final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()); - if (DistributedDataStore.class.isAssignableFrom(testParameter)) { - assertThat(msg, Throwables.getRootCause(ex), instanceOf(NoShardLeaderException.class)); - } else { - assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); - } + assertThat("Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()), + Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); } @Test @@ -1211,9 +1180,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final IntegrationTestKit follower2TestKit = new IntegrationTestKit( follower2System, follower2DatastoreContextBuilder, commitTimeout); - try (AbstractDataStore ds = - follower2TestKit.setupAbstractDataStore( - testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) { + try (var ds = follower2TestKit.setupDataStore(testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) { followerTestKit.waitForMembersUp("member-1", "member-3"); follower2TestKit.waitForMembersUp("member-1", "member-2"); @@ -1250,9 +1217,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final IntegrationTestKit follower2TestKit = new IntegrationTestKit( follower2System, follower2DatastoreContextBuilder, commitTimeout); - final AbstractDataStore ds2 = - follower2TestKit.setupAbstractDataStore( - testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS); + final var ds2 = follower2TestKit.setupDataStore(testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS); followerTestKit.waitForMembersUp("member-1", "member-3"); follower2TestKit.waitForMembersUp("member-1", "member-2"); @@ -1260,7 +1225,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option TestKit.shutdownActorSystem(follower2System, true); - ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow(); final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() .executeOperation(cars, GetOnDemandRaftState.INSTANCE); @@ -1275,7 +1240,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { await().atMost(10, TimeUnit.SECONDS) .until(() -> containsUnreachable(followerCluster, follower2Member)); - ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow(); // to simulate a follower not being able to receive messages, but still being able to send messages and becoming // candidate, we can just send a couple of RequestVotes to both leader and follower. @@ -1318,7 +1283,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)))); AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode); - final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); + final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.of()); final Snapshot initialSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)), Collections.emptyList(), 5, 1, 5, 1, 1, null, null); @@ -1341,9 +1306,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadWriteMessageSlicing() throws Exception { - // The slicing is only implemented for tell-based protocol - assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); - leaderDatastoreContextBuilder.maximumMessageSliceSize(100); followerDatastoreContextBuilder.maximumMessageSliceSize(100); initDatastoresWithCars("testLargeReadReplySlicing"); @@ -1373,15 +1335,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); leaderTestKit.doCommit(initialWriteTx.ready()); - try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( - testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) { + try (var follower2DistributedDataStore = follower2TestKit.setupDataStore(testParameter, testName, + MODULE_SHARDS_CARS_1_2_3, false)) { final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards() .getLocalShards().get("cars").getActor(); final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards() .getLocalShards().get("cars").getActor(); - member2Cars.tell(new StartDropMessages(AppendEntries.class), null); - member3Cars.tell(new StartDropMessages(AppendEntries.class), null); + member2Cars.tell(new StartDropMessages<>(AppendEntries.class), null); + member3Cars.tell(new StartDropMessages<>(AppendEntries.class), null); final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction(); newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); @@ -1409,8 +1371,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1, -1), member3Cars); - member2Cars.tell(new StopDropMessages(AppendEntries.class), null); - member3Cars.tell(new StopDropMessages(AppendEntries.class), null); + member2Cars.tell(new StopDropMessages<>(AppendEntries.class), null); + member3Cars.tell(new StopDropMessages<>(AppendEntries.class), null); await("Is tx stuck in COMMIT_PENDING") .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true)); @@ -1422,23 +1384,22 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testSnapshotOnRootOverwrite() throws Exception { - // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate - assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); - - final String testName = "testSnapshotOnRootOverwrite"; - final String[] shards = {"cars", "default"}; - initDatastores(testName, "module-shards-default-cars-member1-and-2.conf", shards, - leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), - followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); + initDatastores("testSnapshotOnRootOverwrite", "module-shards-default-cars-member1-and-2.conf", + new String[] {"cars", "default"}, + leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), + followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); leaderTestKit.waitForMembersUp("member-2"); - final ContainerNode rootNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + final ContainerNode rootNode = Builders.containerBuilder() + .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME)) .withChild(CarsModel.create()) .build(); - leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.of(), rootNode); + // FIXME: CONTROLLER-2020: ClientBackedDatastore does not have stable indexes/term, + // the snapshot index seems to fluctuate + assumeTrue(false); IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", state -> assertEquals(1, state.getSnapshotIndex())); @@ -1464,7 +1425,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); // root overwrite so expect a snapshot - leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.of(), rootNode); // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1) IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", @@ -1476,7 +1437,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12); } - private void verifySnapshot(final String persistenceId, final long lastAppliedIndex) { + private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex) { await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); assertEquals(1, snap.size()); @@ -1494,10 +1455,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass()); MetadataShardDataTreeSnapshot shardSnapshot = (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot(); - assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get()); + assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow()); } - private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) { + private static void sendDatastoreContextUpdate(final ClientBackedDataStore dataStore, final Builder builder) { final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build()); final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); final Answer answer = invocation -> newBuilder.build();