X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FDistributedShardedDOMDataTreeRemotingTest.java;h=ca8af73cb0d8087b7722652d8c3f800fa562cc57;hp=8564e2d90859098b87a2a1ad4f2fe5880da2f150;hb=2ab5805594a7507fa658d6cf93f72374701b582f;hpb=20f8f30f4bbf1e982672c1f883a6a18b0e4539de diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java index 8564e2d908..ca8af73cb0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.sharding; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -19,13 +20,13 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; -import akka.actor.PoisonPill; import akka.cluster.Cluster; -import akka.cluster.ddata.DistributedData; import akka.testkit.JavaTestKit; import com.google.common.collect.Lists; import com.typesafe.config.ConfigFactory; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -39,6 +40,8 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -54,7 +57,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLe import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Ignore("Needs to have the configuration backend switched from distributed-data") public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class); @@ -65,33 +67,37 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf"; + private ActorSystem leaderSystem; private ActorSystem followerSystem; private final Builder leaderDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5) - .logicalStoreType( - org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); private final DatastoreContext.Builder followerDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5) - .logicalStoreType( - org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); + + private DistributedDataStore leaderConfigDatastore; + private DistributedDataStore leaderOperDatastore; + + private DistributedDataStore followerConfigDatastore; + private DistributedDataStore followerOperDatastore; + - private DistributedDataStore followerDistributedDataStore; - private DistributedDataStore leaderDistributedDataStore; private IntegrationTestKit followerTestKit; private IntegrationTestKit leaderTestKit; - private DistributedShardedDOMDataTree leaderShardFactory; - private DistributedShardedDOMDataTree followerShardFactory; + private DistributedShardedDOMDataTree followerShardFactory; private ActorSystemProvider leaderSystemProvider; private ActorSystemProvider followerSystemProvider; @Before public void setUp() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); @@ -109,64 +115,95 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { @After public void tearDown() { - if (followerDistributedDataStore != null) { - followerDistributedDataStore.close(); + if (leaderConfigDatastore != null) { + leaderConfigDatastore.close(); } - if (leaderDistributedDataStore != null) { - leaderDistributedDataStore.close(); + if (leaderOperDatastore != null) { + leaderOperDatastore.close(); } - DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender()); - DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender()); + if (followerConfigDatastore != null) { + followerConfigDatastore.close(); + } + if (followerOperDatastore != null) { + followerOperDatastore.close(); + } - JavaTestKit.shutdownActorSystem(leaderSystem); - JavaTestKit.shutdownActorSystem(followerSystem); + JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE); + JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE); + + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } - private void initEmptyDatastores(final String type) { + private void initEmptyDatastores() throws Exception { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - leaderDistributedDataStore = - leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); + leaderConfigDatastore = leaderTestKit.setupDistributedDataStore( + "config", MODULE_SHARDS_CONFIG, true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + leaderOperDatastore = leaderTestKit.setupDistributedDataStore( + "operational", MODULE_SHARDS_CONFIG, true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + + leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, + leaderOperDatastore, + leaderConfigDatastore); followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); - followerDistributedDataStore = - followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); - leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, - leaderDistributedDataStore, - leaderDistributedDataStore); + followerConfigDatastore = followerTestKit.setupDistributedDataStore( + "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + followerOperDatastore = followerTestKit.setupDistributedDataStore( + "operational", MODULE_SHARDS_CONFIG, true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider, - followerDistributedDataStore, - followerDistributedDataStore); + followerOperDatastore, + followerConfigDatastore); + + followerTestKit.waitForMembersUp("member-1"); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderShardFactory.init(); + followerShardFactory.init(); + + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), + ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY)); + + leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(), ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY)); } @Test public void testProducerRegistrations() throws Exception { - initEmptyDatastores("config"); + LOG.info("testProducerRegistrations starting"); + initEmptyDatastores(); leaderTestKit.waitForMembersUp("member-2"); + // TODO refactor shard creation and verification to own method final DistributedShardRegistration shardRegistration = waitOnAsyncTask(leaderShardFactory.createDistributedShard( TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); + final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager(); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); + final Set peers = new HashSet<>(); + IntegrationTestKit.verifyShardState(leaderConfigDatastore, + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState -> + peers.addAll(onDemandShardState.getPeerAddresses().values())); + assertEquals(peers.size(), 1); + final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); try { followerShardFactory.createProducer(Collections.singleton(TEST_ID)); @@ -196,11 +233,16 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { } catch (final DOMDataTreeShardingConflictException e) { assertTrue(e.getMessage().contains("is already occupied by another shard")); } + + shardRegistration.close().toCompletableFuture().get(); + + LOG.info("testProducerRegistrations ending"); } @Test public void testWriteIntoMultipleShards() throws Exception { - initEmptyDatastores("config"); + LOG.info("testWriteIntoMultipleShards starting"); + initEmptyDatastores(); leaderTestKit.waitForMembersUp("member-2"); @@ -211,11 +253,17 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - findLocalShard(followerDistributedDataStore.getActorContext(), + findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); + final Set peers = new HashSet<>(); + IntegrationTestKit.verifyShardState(leaderConfigDatastore, + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState -> + peers.addAll(onDemandShardState.getPeerAddresses().values())); + assertEquals(peers.size(), 1); + LOG.debug("Got after waiting for nonleader"); final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); @@ -232,11 +280,16 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { LOG.warn("Got to pre submit"); tx.submit().checkedGet(); + + shardRegistration.close().toCompletableFuture().get(); + + LOG.info("testWriteIntoMultipleShards ending"); } @Test public void testMultipleShardRegistrations() throws Exception { - initEmptyDatastores("config"); + LOG.info("testMultipleShardRegistrations starting"); + initEmptyDatastores(); final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), @@ -257,81 +310,83 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); // check leader has local shards - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); // check follower has local shards - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); - LOG.debug("Closing registrations"); - reg1.close(); - reg2.close(); - reg3.close(); - reg4.close(); + reg1.close().toCompletableFuture().get(); + reg2.close().toCompletableFuture().get(); + reg3.close().toCompletableFuture().get(); + reg4.close().toCompletableFuture().get(); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); LOG.debug("All leader shards gone"); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); LOG.debug("All follower shards gone"); + LOG.info("testMultipleShardRegistrations ending"); } @Test + @Ignore public void testMultipleRegistrationsAtOnePrefix() throws Exception { - initEmptyDatastores("config"); + LOG.info("testMultipleRegistrationsAtOnePrefix starting"); + initEmptyDatastores(); for (int i = 0; i < 10; i++) { LOG.debug("Round {}", i); @@ -339,22 +394,31 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); + + final Set peers = new HashSet<>(); + IntegrationTestKit.verifyShardState(leaderConfigDatastore, + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState -> + peers.addAll(onDemandShardState.getPeerAddresses().values())); + assertEquals(peers.size(), 1); + waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); } + + LOG.info("testMultipleRegistrationsAtOnePrefix ending"); } }