Use Akka artery for remote transport
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / PrefixShardCreationTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.AddressFromURIString;
14 import akka.actor.Status.Success;
15 import akka.cluster.Cluster;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.collect.Lists;
20 import com.typesafe.config.ConfigFactory;
21 import java.util.Collections;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.access.concepts.MemberName;
25 import org.opendaylight.controller.cluster.datastore.Shard.Builder;
26 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
27 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
28 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
30 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
31 import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
32 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
33 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
34 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
35 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
36 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
37 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
38 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
40 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
43 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Tests prefix shard creation in ShardManager.
50  */
51 public class PrefixShardCreationTest extends AbstractShardManagerTest {
52
53     private static final Logger LOG = LoggerFactory.getLogger(PrefixShardCreationTest.class);
54
55     private static final DOMDataTreeIdentifier TEST_ID =
56             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
57
58     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
59
60     @Test
61     public void testPrefixShardCreation() throws Exception {
62
63         LOG.info("testPrefixShardCreation starting");
64         new JavaTestKit(getSystem()) {
65             {
66                 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
67
68                 final ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
69                         new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
70
71                 final SchemaContext schemaContext = TestModel.createTestContext();
72                 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
73
74                 shardManager.tell(new FindLocalShard(
75                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
76                 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
77
78                 final Builder builder = Shard.builder();
79
80                 final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
81                         new PrefixShardConfiguration(TEST_ID,
82                                 PrefixShardStrategy.NAME,
83                                 Collections.singletonList(MEMBER_1)),
84                         datastoreContextBuilder.build(), builder);
85
86                 shardManager.tell(createPrefixedShard, getRef());
87                 expectMsgClass(duration("5 seconds"), Success.class);
88
89                 shardManager.tell(new FindLocalShard(
90                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
91                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
92             }
93         };
94     }
95
96     @Test
97     public void testPrefixShardReplicas() throws Exception {
98         LOG.info("testPrefixShardReplicas starting");
99         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
100
101         // Create ACtorSystem for member-1
102         final ActorSystem system1 = newActorSystem("Member1");
103         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
104
105         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
106                 newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
107                         .waitTillReadyCountDownLatch(ready)
108                         .cluster(new ClusterWrapperImpl(system1))
109                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
110                 shardManagerID);
111
112         // Create an ActorSystem ShardManager actor for member-2.
113
114         final ActorSystem system2 = newActorSystem("Member2");
115
116         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
117
118         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
119                 newTestShardMgrBuilder()
120                         .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
121                         .waitTillReadyCountDownLatch(ready)
122                         .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher(
123                         Dispatchers.DefaultDispatcherId()),
124                 shardManagerID);
125
126         final JavaTestKit kit2 = new JavaTestKit(system2);
127
128         new JavaTestKit(system1) {
129             {
130                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
131                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
132
133                 // check shard does not exist
134                 shardManager1.tell(new FindLocalShard(
135                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
136                 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
137
138                 shardManager2.tell(new FindLocalShard(
139                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
140                 kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
141
142                 // create shard on node1
143                 final Builder builder = Shard.builder();
144
145                 final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
146                         new PrefixShardConfiguration(TEST_ID,
147                                 PrefixShardStrategy.NAME,
148                                 Lists.newArrayList(MEMBER_1, MEMBER_2)),
149                         datastoreContextBuilder.build(), builder);
150
151                 shardManager1.tell(createPrefixedShard, getRef());
152                 expectMsgClass(duration("5 seconds"), Success.class);
153
154                 shardManager1.underlyingActor().waitForMemberUp();
155
156                 LOG.info("changed leader state");
157
158                 // check node2 cannot find it locally
159                 shardManager1.tell(new FindLocalShard(
160                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
161                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
162
163                 shardManager2.tell(new FindLocalShard(
164                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
165                 kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
166
167                 // but can remotely
168                 shardManager2.tell(new FindPrimary(
169                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
170                 kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
171
172                 // add replica and verify if succesful
173                 shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef());
174                 kit2.expectMsgClass(duration("5 seconds"), Success.class);
175
176                 // verify we have a replica on manager2 now
177                 shardManager2.tell(new FindLocalShard(
178                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
179                 kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
180             }
181         };
182     }
183
184     private ActorSystem newActorSystem(final String config) {
185         final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
186         actorSystems.add(system);
187         return system;
188     }
189 }