X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FDistributedShardedDOMDataTreeRemotingTest.java;h=b2ef45a3dd357234fe2a7348e121a528e356d8c4;hp=27c5c49d70ec114bbb5b18414bb85728aa7a885f;hb=def2aa2710cabf4d1867e8ce5dd847d380ef9393;hpb=7204c455a1636a7fc89bcd28fe9e9000eaa81b3b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java index 27c5c49d70..b2ef45a3dd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.sharding; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -19,17 +20,16 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; -import akka.actor.PoisonPill; import akka.cluster.Cluster; -import akka.cluster.ddata.DistributedData; import akka.testkit.JavaTestKit; import com.google.common.collect.Lists; import com.typesafe.config.ConfigFactory; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.opendaylight.controller.cluster.ActorSystemProvider; @@ -38,11 +38,9 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; -import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; -import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; -import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; -import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -58,7 +56,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLe import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Ignore("Needs to have the configuration backend switched from distributed-data") public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class); @@ -69,33 +66,37 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf"; + private ActorSystem leaderSystem; private ActorSystem followerSystem; private final Builder leaderDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5) - .logicalStoreType( - org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); private final DatastoreContext.Builder followerDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5) - .logicalStoreType( - org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); + + private DistributedDataStore leaderConfigDatastore; + private DistributedDataStore leaderOperDatastore; + + private DistributedDataStore followerConfigDatastore; + private DistributedDataStore followerOperDatastore; + - private DistributedDataStore followerDistributedDataStore; - private DistributedDataStore leaderDistributedDataStore; private IntegrationTestKit followerTestKit; private IntegrationTestKit leaderTestKit; - private DistributedShardedDOMDataTree leaderShardFactory; - private DistributedShardedDOMDataTree followerShardFactory; + private DistributedShardedDOMDataTree followerShardFactory; private ActorSystemProvider leaderSystemProvider; private ActorSystemProvider followerSystemProvider; @Before public void setUp() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); @@ -113,65 +114,95 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { @After public void tearDown() { - if (followerDistributedDataStore != null) { - followerDistributedDataStore.close(); + if (leaderConfigDatastore != null) { + leaderConfigDatastore.close(); } - if (leaderDistributedDataStore != null) { - leaderDistributedDataStore.close(); + if (leaderOperDatastore != null) { + leaderOperDatastore.close(); } - DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender()); - DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender()); + if (followerConfigDatastore != null) { + followerConfigDatastore.close(); + } + if (followerOperDatastore != null) { + followerOperDatastore.close(); + } - JavaTestKit.shutdownActorSystem(leaderSystem); - JavaTestKit.shutdownActorSystem(followerSystem); + JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE); + JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE); + + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } - private void initEmptyDatastores(final String type) { + private void initEmptyDatastores() throws Exception { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - leaderDistributedDataStore = - leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); + leaderConfigDatastore = leaderTestKit.setupDistributedDataStore( + "config", MODULE_SHARDS_CONFIG, true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + leaderOperDatastore = leaderTestKit.setupDistributedDataStore( + "operational", MODULE_SHARDS_CONFIG, true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + + leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, + leaderOperDatastore, + leaderConfigDatastore); followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); - followerDistributedDataStore = - followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); - leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, - leaderDistributedDataStore, - leaderDistributedDataStore); + followerConfigDatastore = followerTestKit.setupDistributedDataStore( + "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + followerOperDatastore = followerTestKit.setupDistributedDataStore( + "operational", MODULE_SHARDS_CONFIG, true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider, - followerDistributedDataStore, - followerDistributedDataStore); + followerOperDatastore, + followerConfigDatastore); + + followerTestKit.waitForMembersUp("member-1"); + + leaderShardFactory.init(); + followerShardFactory.init(); + + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), + ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(), ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY)); } @Test - @Ignore("Needs different shard creation handling due to replicas") public void testProducerRegistrations() throws Exception { - initEmptyDatastores("config"); + LOG.info("testProducerRegistrations starting"); + initEmptyDatastores(); leaderTestKit.waitForMembersUp("member-2"); + // TODO refactor shard creation and verification to own method final DistributedShardRegistration shardRegistration = waitOnAsyncTask(leaderShardFactory.createDistributedShard( TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); + final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager(); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); + final Set peers = new HashSet<>(); + IntegrationTestKit.verifyShardState(leaderConfigDatastore, + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState -> + peers.addAll(onDemandShardState.getPeerAddresses().values())); + assertEquals(peers.size(), 1); + final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); try { followerShardFactory.createProducer(Collections.singleton(TEST_ID)); @@ -194,54 +225,45 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { followerProducer.close(); // try to create a shard on an already registered prefix on follower try { - followerShardFactory.createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + waitOnAsyncTask(followerShardFactory.createDistributedShard( + TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); fail("This prefix already should have a shard registration that was forwarded from the other node"); } catch (final DOMDataTreeShardingConflictException e) { - assertTrue(e.getMessage().contains("is already occupied by shard")); + assertTrue(e.getMessage().contains("is already occupied by another shard")); } + + shardRegistration.close().toCompletableFuture().get(); + + LOG.info("testProducerRegistrations ending"); } @Test - @Ignore("Needs different shard creation handling due to replicas") public void testWriteIntoMultipleShards() throws Exception { - initEmptyDatastores("config"); + LOG.info("testWriteIntoMultipleShards starting"); + initEmptyDatastores(); leaderTestKit.waitForMembersUp("member-2"); - LOG.warn("registering first shard"); + LOG.debug("registering first shard"); final DistributedShardRegistration shardRegistration = waitOnAsyncTask(leaderShardFactory.createDistributedShard( TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - findLocalShard(followerDistributedDataStore.getActorContext(), + findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - LOG.warn("Got after waiting for nonleader"); - final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); - - new JavaTestKit(leaderSystem) { - { - leaderShardManager.tell( - new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); - - final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager(); - - followerShardManager.tell(new FindLocalShard( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef()); - followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class); - LOG.warn("Found follower shard"); - - leaderDistributedDataStore.getActorContext().getShardManager().tell( - new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); - } - }; + final Set peers = new HashSet<>(); + IntegrationTestKit.verifyShardState(leaderConfigDatastore, + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState -> + peers.addAll(onDemandShardState.getPeerAddresses().values())); + assertEquals(peers.size(), 1); + LOG.debug("Got after waiting for nonleader"); final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true); @@ -256,12 +278,17 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { cursor.close(); LOG.warn("Got to pre submit"); - tx.submit(); + tx.submit().checkedGet(); + + shardRegistration.close().toCompletableFuture().get(); + + LOG.info("testWriteIntoMultipleShards ending"); } @Test public void testMultipleShardRegistrations() throws Exception { - initEmptyDatastores("config"); + LOG.info("testMultipleShardRegistrations starting"); + initEmptyDatastores(); final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), @@ -282,81 +309,82 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); // check leader has local shards - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); // check follower has local shards - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); - LOG.debug("Closing registrations"); - reg1.close(); - reg2.close(); - reg3.close(); - reg4.close(); + reg1.close().toCompletableFuture().get(); + reg2.close().toCompletableFuture().get(); + reg3.close().toCompletableFuture().get(); + reg4.close().toCompletableFuture().get(); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); LOG.debug("All leader shards gone"); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); LOG.debug("All follower shards gone"); + LOG.info("testMultipleShardRegistrations ending"); } @Test public void testMultipleRegistrationsAtOnePrefix() throws Exception { - initEmptyDatastores("config"); + LOG.info("testMultipleRegistrationsAtOnePrefix starting"); + initEmptyDatastores(); for (int i = 0; i < 10; i++) { LOG.debug("Round {}", i); @@ -364,22 +392,31 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); + + final Set peers = new HashSet<>(); + IntegrationTestKit.verifyShardState(leaderConfigDatastore, + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState -> + peers.addAll(onDemandShardState.getPeerAddresses().values())); + assertEquals(peers.size(), 1); + waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); } + + LOG.info("testMultipleRegistrationsAtOnePrefix ending"); } }