X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=da219faaf1cdd893fc67bf687145b7798849faee;hp=8637a5f17d4a1e2f6cd0e805d3bde082bc5f3248;hb=e9efe27538adb5ae575f77fda90f147d46341801;hpb=bb248f15d352cdd69e53ff7756fcb2c62cdc3eac diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 8637a5f17d..da219faaf1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -32,7 +32,6 @@ import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -45,7 +44,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -115,13 +113,13 @@ import org.opendaylight.yangtools.yang.common.Uint64; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode; +import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -204,9 +202,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDistributedDataStore.close(); } - TestKit.shutdownActorSystem(leaderSystem); - TestKit.shutdownActorSystem(followerSystem); - TestKit.shutdownActorSystem(follower2System); + TestKit.shutdownActorSystem(leaderSystem, true); + TestKit.shutdownActorSystem(followerSystem, true); + TestKit.shutdownActorSystem(follower2System,true); InMemoryJournal.clear(); InMemorySnapshotStore.clear(); @@ -227,7 +225,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards, - DatastoreContext.Builder leaderBuilder, DatastoreContext.Builder followerBuilder) throws Exception { + final DatastoreContext.Builder leaderBuilder, final DatastoreContext.Builder followerBuilder) + throws Exception { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout); leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore( @@ -245,12 +244,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) throws Exception { - final Optional> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + final Optional optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); - final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( + final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( CarsModel.CAR_QNAME); - for (final NormalizedNode entry: entries) { + for (final NormalizedNode entry: entries) { listBuilder.withChild((MapEntryNode) entry); } @@ -258,8 +257,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, - final NormalizedNode expNode) throws Exception { - final Optional> optional = readTx.read(path).get(5, TimeUnit.SECONDS); + final NormalizedNode expNode) throws Exception { + final Optional optional = readTx.read(path).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", expNode, optional.get()); } @@ -401,18 +400,22 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } assertEquals(0, metadata.getClosedTransactions().size()); - assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)), - metadata.getPurgedTransactions().asRanges().iterator().next()); + + final var purgedRanges = metadata.getPurgedTransactions().ranges(); + assertEquals(1, purgedRanges.size()); + final var purgedRange = purgedRanges.first(); + assertEquals(UnsignedLong.ZERO, purgedRange.lower()); + assertEquals(UnsignedLong.valueOf(10), purgedRange.upper()); } else { // ask based should track no metadata assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); } }); - final Optional> optional = txChain.newReadOnlyTransaction() + final Optional optional = txChain.newReadOnlyTransaction() .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + assertEquals("# cars", numCars, ((Collection) optional.get().body()).size()); } @Test @@ -465,20 +468,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { metadata = iterator.next(); } - Set> ranges = metadata.getPurgedTransactions().asRanges(); - assertEquals(0, metadata.getClosedTransactions().size()); - assertEquals(1, ranges.size()); + assertEquals(1, metadata.getPurgedTransactions().size()); } else { // ask based should track no metadata assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); } }); - final Optional> optional = txChain.newReadOnlyTransaction() + final Optional optional = txChain.newReadOnlyTransaction() .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + assertEquals("# cars", numCars, ((Collection) optional.get().body()).size()); } @Test @@ -515,11 +516,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertNotNull("newWriteOnlyTransaction returned null", writeTx); final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; - final NormalizedNode carsNode = CarsModel.emptyContainer(); + final NormalizedNode carsNode = CarsModel.emptyContainer(); writeTx.write(carsPath, carsNode); final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; - final NormalizedNode peopleNode = PeopleModel.emptyContainer(); + final NormalizedNode peopleNode = PeopleModel.emptyContainer(); writeTx.write(peoplePath, peopleNode); followerTestKit.doCommit(writeTx.ready()); @@ -538,11 +539,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertNotNull("newReadWriteTransaction returned null", rwTx); final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; - final NormalizedNode carsNode = CarsModel.emptyContainer(); + final NormalizedNode carsNode = CarsModel.emptyContainer(); rwTx.write(carsPath, carsNode); final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; - final NormalizedNode peopleNode = PeopleModel.emptyContainer(); + final NormalizedNode peopleNode = PeopleModel.emptyContainer(); rwTx.write(peoplePath, peopleNode); followerTestKit.doCommit(rwTx.ready()); @@ -629,7 +630,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.merge(personPath, person); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + Optional optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", car, optional.get()); @@ -964,7 +965,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; - NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) + NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) .withChild(PeopleModel.newPersonEntry("Dude")).build(); writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); @@ -1285,10 +1286,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerTestKit.waitForMembersUp("member-1", "member-3"); follower2TestKit.waitForMembersUp("member-1", "member-2"); - TestKit.shutdownActorSystem(follower2System); + // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option + TestKit.shutdownActorSystem(follower2System, true); ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); - OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() .executeOperation(cars, GetOnDemandRaftState.INSTANCE); Cluster leaderCluster = Cluster.get(leaderSystem); @@ -1338,7 +1340,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)))); AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode); - final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); + final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); final Snapshot initialSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)), Collections.emptyList(), 5, 1, 5, 1, 1, null, null); @@ -1349,7 +1351,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars(testName); - final Optional> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( + final Optional readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", readOptional.isPresent()); assertEquals("Node", carsNode, readOptional.get()); @@ -1372,7 +1374,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); - final NormalizedNode carsNode = CarsModel.create(); + final NormalizedNode carsNode = CarsModel.create(); rwTx.write(CarsModel.BASE_PATH, carsNode); verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); @@ -1443,7 +1445,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } @Test - @Ignore("Writes to root node are not split into shards") public void testSnapshotOnRootOverwrite() throws Exception { if (!DistributedDataStore.class.isAssignableFrom(testParameter)) { // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate @@ -1451,27 +1452,27 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } final String testName = "testSnapshotOnRootOverwrite"; - String[] shards = {"cars", "default"}; - initDatastores(testName, "module-shards-default-cars-member1.conf", shards, + final String[] shards = {"cars", "default"}; + initDatastores(testName, "module-shards-default-cars-member1-and-2.conf", shards, leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); leaderTestKit.waitForMembersUp("member-2"); - ContainerNode rootNode = ImmutableContainerNodeBuilder.create() + final ContainerNode rootNode = ImmutableContainerNodeBuilder.create() .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) - .withChild((ContainerNode) CarsModel.create()) + .withChild(CarsModel.create()) .build(); leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", - state -> assertEquals(0, state.getSnapshotIndex())); + state -> assertEquals(1, state.getSnapshotIndex())); IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", - state -> assertEquals(0, state.getSnapshotIndex())); + state -> assertEquals(1, state.getSnapshotIndex())); - verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0); - verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0); + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); for (int i = 0; i < 10; i++) { leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i), @@ -1480,28 +1481,28 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // fake snapshot causes the snapshotIndex to move IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", - state -> assertEquals(9, state.getSnapshotIndex())); + state -> assertEquals(10, state.getSnapshotIndex())); IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", - state -> assertEquals(9, state.getSnapshotIndex())); + state -> assertEquals(10, state.getSnapshotIndex())); - // however the real snapshot still has not changed and was taken at index 0 - verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0); - verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0); + // however the real snapshot still has not changed and was taken at index 1 + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); // root overwrite so expect a snapshot leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); - // this was a real snapshot so everything should be in it(1 + 10 + 1) + // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1) IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", - state -> assertEquals(11, state.getSnapshotIndex())); + state -> assertEquals(12, state.getSnapshotIndex())); IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", - state -> assertEquals(11, state.getSnapshotIndex())); + state -> assertEquals(12, state.getSnapshotIndex())); - verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 11); - verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 11); + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 12); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12); } - private void verifySnapshot(String persistenceId, long lastAppliedIndex) { + private void verifySnapshot(final String persistenceId, final long lastAppliedIndex) { await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); assertEquals(1, snap.size()); @@ -1511,7 +1512,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } private static void verifySnapshot(final Snapshot actual, final Snapshot expected, - final NormalizedNode expRoot) { + final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex()); assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());