BUG 2138: Introduce prefix based shards into ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / PrefixShardCreationTest.java
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java
new file mode 100644 (file)
index 0000000..2a3431f
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.AddressFromURIString;
+import akka.actor.Status.Success;
+import akka.cluster.Cluster;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.Shard.Builder;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests prefix shard creation in ShardManager.
+ */
+public class PrefixShardCreationTest extends AbstractShardManagerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrefixShardCreationTest.class);
+
+    private static final DOMDataTreeIdentifier TEST_ID =
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+    private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+
+    @Test
+    public void testPrefixShardCreation() throws Exception {
+
+        LOG.info("testPrefixShardCreation starting");
+        new JavaTestKit(getSystem()) {
+            {
+                datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
+
+                final ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
+                        new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+                final SchemaContext schemaContext = TestModel.createTestContext();
+                shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+
+                shardManager.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+                final Builder builder = Shard.builder();
+
+                final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
+                        new PrefixShardConfiguration(TEST_ID,
+                                PrefixShardStrategy.NAME,
+                                Collections.singletonList(MEMBER_1)),
+                        datastoreContextBuilder.build(), builder);
+
+                shardManager.tell(createPrefixedShard, getRef());
+                expectMsgClass(duration("5 seconds"), Success.class);
+
+                shardManager.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+            }
+        };
+    }
+
+    @Test
+    public void testPrefixShardReplicas() throws Exception {
+        LOG.info("testPrefixShardReplicas starting");
+        final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create ACtorSystem for member-1
+        final ActorSystem system1 = newActorSystem("Member1");
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
+                newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+                        .waitTillReadyCountDownLatch(ready)
+                        .cluster(new ClusterWrapperImpl(system1))
+                        .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = newActorSystem("Member2");
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
+                newTestShardMgrBuilder()
+                        .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+                        .waitTillReadyCountDownLatch(ready)
+                        .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher(
+                        Dispatchers.DefaultDispatcherId()),
+                shardManagerID);
+
+        final JavaTestKit kit2 = new JavaTestKit(system2);
+
+        new JavaTestKit(system1) {
+            {
+                shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+                shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+                // check shard does not exist
+                shardManager1.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+                shardManager2.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+                // create shard on node1
+                final Builder builder = Shard.builder();
+
+                final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
+                        new PrefixShardConfiguration(TEST_ID,
+                                PrefixShardStrategy.NAME,
+                                Lists.newArrayList(MEMBER_1, MEMBER_2)),
+                        datastoreContextBuilder.build(), builder);
+
+                shardManager1.tell(createPrefixedShard, getRef());
+                expectMsgClass(duration("5 seconds"), Success.class);
+
+                shardManager1.underlyingActor().waitForMemberUp();
+
+                LOG.info("changed leader state");
+
+                // check node2 cannot find it locally
+                shardManager1.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+                shardManager2.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+                // but can remotely
+                shardManager2.tell(new FindPrimary(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+
+                // add replica and verify if succesful
+                shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), Success.class);
+
+                // verify we have a replica on manager2 now
+                shardManager2.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+            }
+        };
+    }
+
+    private ActorSystem newActorSystem(final String config) {
+        final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
+        actorSystems.add(system);
+        return system;
+    }
+}
\ No newline at end of file