BUG-2138: DistributedShardListeners support for nested shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTreeRemotingTest.java
index eb109f1e0a667945299f4bbfd232be609e5fa987..8564e2d90859098b87a2a1ad4f2fe5880da2f150 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.sharding;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
 
@@ -30,17 +31,14 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.cluster.datastore.AbstractTest;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
-import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -72,11 +70,14 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
 
 
     private final Builder leaderDatastoreContextBuilder =
-            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
+                    .logicalStoreType(
+                            org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
 
     private final DatastoreContext.Builder followerDatastoreContextBuilder =
             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
-                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+                    .logicalStoreType(
+                            org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
 
     private DistributedDataStore followerDistributedDataStore;
     private DistributedDataStore leaderDistributedDataStore;
@@ -86,6 +87,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
     private DistributedShardedDOMDataTree leaderShardFactory;
     private DistributedShardedDOMDataTree followerShardFactory;
 
+    private ActorSystemProvider leaderSystemProvider;
+    private ActorSystemProvider followerSystemProvider;
+
     @Before
     public void setUp() {
 
@@ -95,6 +99,12 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
 
+        leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
+        doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
+
+        followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
+        doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
+
     }
 
     @After
@@ -123,11 +133,11 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         followerDistributedDataStore =
                 followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
 
-        leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
+        leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
                 leaderDistributedDataStore,
                 leaderDistributedDataStore);
 
-        followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
+        followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
                 followerDistributedDataStore,
                 followerDistributedDataStore);
 
@@ -136,15 +146,15 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
     }
 
     @Test
-    @Ignore("Needs different shard creation handling due to replicas")
     public void testProducerRegistrations() throws Exception {
         initEmptyDatastores("config");
 
         leaderTestKit.waitForMembersUp("member-2");
 
         final DistributedShardRegistration shardRegistration =
-                leaderShardFactory.createDistributedShard(
-                        TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+                waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                        TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                        DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
@@ -179,53 +189,34 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         followerProducer.close();
         // try to create a shard on an already registered prefix on follower
         try {
-            followerShardFactory.createDistributedShard(TEST_ID,
-                    Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+            waitOnAsyncTask(followerShardFactory.createDistributedShard(
+                    TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                    DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
             fail("This prefix already should have a shard registration that was forwarded from the other node");
         } catch (final DOMDataTreeShardingConflictException e) {
-            assertTrue(e.getMessage().contains("is already occupied by shard"));
+            assertTrue(e.getMessage().contains("is already occupied by another shard"));
         }
     }
 
     @Test
-    @Ignore("Needs different shard creation handling due to replicas")
     public void testWriteIntoMultipleShards() throws Exception {
         initEmptyDatastores("config");
 
         leaderTestKit.waitForMembersUp("member-2");
 
-        LOG.warn("registering first shard");
+        LOG.debug("registering first shard");
         final DistributedShardRegistration shardRegistration =
-                leaderShardFactory.createDistributedShard(TEST_ID,
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+                waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                        TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                        DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
         findLocalShard(followerDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
 
-        LOG.warn("Got after waiting for nonleader");
-        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
-
-        new JavaTestKit(leaderSystem) {
-            {
-                leaderShardManager.tell(
-                        new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
-                final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
-
-                followerShardManager.tell(new FindLocalShard(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
-                followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-                LOG.warn("Found follower shard");
-
-                leaderDistributedDataStore.getActorContext().getShardManager().tell(
-                        new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
-            }
-        };
-
+        LOG.debug("Got after waiting for nonleader");
         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
 
         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
@@ -240,31 +231,31 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         cursor.close();
         LOG.warn("Got to pre submit");
 
-        tx.submit();
+        tx.submit().checkedGet();
     }
 
     @Test
     public void testMultipleShardRegistrations() throws Exception {
         initEmptyDatastores("config");
 
-        final DistributedShardRegistration reg1 = leaderShardFactory
-                .createDistributedShard(TEST_ID,
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        final DistributedShardRegistration reg2 = leaderShardFactory
-                .createDistributedShard(
-                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
+                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        final DistributedShardRegistration reg3 = leaderShardFactory
-                .createDistributedShard(
-                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
+                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        final DistributedShardRegistration reg4 = leaderShardFactory
-                .createDistributedShard(
-                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
+                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
@@ -344,9 +335,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
 
         for (int i = 0; i < 10; i++) {
             LOG.debug("Round {}", i);
-            final DistributedShardRegistration reg1 = leaderShardFactory
-                    .createDistributedShard(TEST_ID,
-                            Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+            final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+                    TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                    DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
             leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
@@ -357,7 +348,7 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
             assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
 
-            reg1.close();
+            waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
             waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));