2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.AddressFromURIString;
14 import akka.actor.Status.Success;
15 import akka.cluster.Cluster;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.collect.Lists;
20 import com.typesafe.config.ConfigFactory;
21 import java.util.Collections;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.access.concepts.MemberName;
25 import org.opendaylight.controller.cluster.datastore.Shard.Builder;
26 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
27 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
28 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
30 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
31 import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
32 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
33 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
34 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
35 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
36 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
37 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
38 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
40 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
43 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * Tests prefix shard creation in ShardManager.
51 public class PrefixShardCreationTest extends AbstractShardManagerTest {
53 private static final Logger LOG = LoggerFactory.getLogger(PrefixShardCreationTest.class);
55 private static final DOMDataTreeIdentifier TEST_ID =
56 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
58 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
61 public void testPrefixShardCreation() throws Exception {
63 LOG.info("testPrefixShardCreation starting");
64 new JavaTestKit(getSystem()) {
66 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
68 final ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
69 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
71 final SchemaContext schemaContext = TestModel.createTestContext();
72 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
74 shardManager.tell(new FindLocalShard(
75 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
76 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
78 final Builder builder = Shard.builder();
80 final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
81 new PrefixShardConfiguration(TEST_ID,
82 PrefixShardStrategy.NAME,
83 Collections.singletonList(MEMBER_1)),
84 datastoreContextBuilder.build(), builder);
86 shardManager.tell(createPrefixedShard, getRef());
87 expectMsgClass(duration("5 seconds"), Success.class);
89 shardManager.tell(new FindLocalShard(
90 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
91 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
97 public void testPrefixShardReplicas() throws Exception {
98 LOG.info("testPrefixShardReplicas starting");
99 final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
101 // Create ACtorSystem for member-1
102 final ActorSystem system1 = newActorSystem("Member1");
103 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
105 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
106 newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
107 .waitTillReadyCountDownLatch(ready)
108 .cluster(new ClusterWrapperImpl(system1))
109 .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
112 // Create an ActorSystem ShardManager actor for member-2.
114 final ActorSystem system2 = newActorSystem("Member2");
116 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
118 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
119 newTestShardMgrBuilder()
120 .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
121 .waitTillReadyCountDownLatch(ready)
122 .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher(
123 Dispatchers.DefaultDispatcherId()),
126 final JavaTestKit kit2 = new JavaTestKit(system2);
128 new JavaTestKit(system1) {
130 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
131 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
133 // check shard does not exist
134 shardManager1.tell(new FindLocalShard(
135 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
136 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
138 shardManager2.tell(new FindLocalShard(
139 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
140 kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
142 // create shard on node1
143 final Builder builder = Shard.builder();
145 final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
146 new PrefixShardConfiguration(TEST_ID,
147 PrefixShardStrategy.NAME,
148 Lists.newArrayList(MEMBER_1, MEMBER_2)),
149 datastoreContextBuilder.build(), builder);
151 shardManager1.tell(createPrefixedShard, getRef());
152 expectMsgClass(duration("5 seconds"), Success.class);
154 shardManager1.underlyingActor().waitForMemberUp();
156 LOG.info("changed leader state");
158 // check node2 cannot find it locally
159 shardManager1.tell(new FindLocalShard(
160 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
161 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
163 shardManager2.tell(new FindLocalShard(
164 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
165 kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
168 shardManager2.tell(new FindPrimary(
169 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
170 kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
172 // add replica and verify if succesful
173 shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef());
174 kit2.expectMsgClass(duration("5 seconds"), Success.class);
176 // verify we have a replica on manager2 now
177 shardManager2.tell(new FindLocalShard(
178 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
179 kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
184 private ActorSystem newActorSystem(final String config) {
185 final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
186 actorSystems.add(system);