2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
15 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSystem;
19 import akka.actor.Address;
20 import akka.actor.AddressFromURIString;
21 import akka.actor.PoisonPill;
22 import akka.cluster.Cluster;
23 import akka.cluster.ddata.DistributedData;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.collect.Lists;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Collections;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Before;
31 import org.junit.Ignore;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractTest;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
37 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
38 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
39 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
40 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
42 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
43 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
44 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
45 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
55 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 @Ignore("Needs to have the configuration backend switched from distributed-data")
60 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
62 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
64 private static final Address MEMBER_1_ADDRESS =
65 AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
67 private static final DOMDataTreeIdentifier TEST_ID =
68 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
70 private ActorSystem leaderSystem;
71 private ActorSystem followerSystem;
74 private final Builder leaderDatastoreContextBuilder =
75 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
77 private final DatastoreContext.Builder followerDatastoreContextBuilder =
78 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
79 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
81 private DistributedDataStore followerDistributedDataStore;
82 private DistributedDataStore leaderDistributedDataStore;
83 private IntegrationTestKit followerTestKit;
84 private IntegrationTestKit leaderTestKit;
86 private DistributedShardedDOMDataTree leaderShardFactory;
87 private DistributedShardedDOMDataTree followerShardFactory;
92 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
93 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
95 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
96 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
101 public void tearDown() {
102 if (followerDistributedDataStore != null) {
103 followerDistributedDataStore.close();
105 if (leaderDistributedDataStore != null) {
106 leaderDistributedDataStore.close();
109 DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
110 DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
112 JavaTestKit.shutdownActorSystem(leaderSystem);
113 JavaTestKit.shutdownActorSystem(followerSystem);
116 private void initEmptyDatastores(final String type) {
117 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
119 leaderDistributedDataStore =
120 leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
122 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
123 followerDistributedDataStore =
124 followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
126 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
127 leaderDistributedDataStore,
128 leaderDistributedDataStore);
130 followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
131 followerDistributedDataStore,
132 followerDistributedDataStore);
134 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
135 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
139 @Ignore("Needs different shard creation handling due to replicas")
140 public void testProducerRegistrations() throws Exception {
141 initEmptyDatastores("config");
143 leaderTestKit.waitForMembersUp("member-2");
145 final DistributedShardRegistration shardRegistration =
146 leaderShardFactory.createDistributedShard(
147 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
149 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
150 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
152 final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
154 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
155 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
157 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
158 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
160 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
162 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
163 fail("Producer should be already registered on the other node");
164 } catch (final IllegalArgumentException e) {
165 assertTrue(e.getMessage().contains("is attached to producer"));
170 final DOMDataTreeProducer followerProducer =
171 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
173 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
174 fail("Producer should be already registered on the other node");
175 } catch (final IllegalArgumentException e) {
176 assertTrue(e.getMessage().contains("is attached to producer"));
179 followerProducer.close();
180 // try to create a shard on an already registered prefix on follower
182 followerShardFactory.createDistributedShard(TEST_ID,
183 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
184 fail("This prefix already should have a shard registration that was forwarded from the other node");
185 } catch (final DOMDataTreeShardingConflictException e) {
186 assertTrue(e.getMessage().contains("is already occupied by shard"));
191 @Ignore("Needs different shard creation handling due to replicas")
192 public void testWriteIntoMultipleShards() throws Exception {
193 initEmptyDatastores("config");
195 leaderTestKit.waitForMembersUp("member-2");
197 LOG.warn("registering first shard");
198 final DistributedShardRegistration shardRegistration =
199 leaderShardFactory.createDistributedShard(TEST_ID,
200 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
202 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
203 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
204 findLocalShard(followerDistributedDataStore.getActorContext(),
205 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
207 LOG.warn("Got after waiting for nonleader");
208 final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
210 new JavaTestKit(leaderSystem) {
212 leaderShardManager.tell(
213 new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
214 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
216 final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
218 followerShardManager.tell(new FindLocalShard(
219 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
220 followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
221 LOG.warn("Found follower shard");
223 leaderDistributedDataStore.getActorContext().getShardManager().tell(
224 new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
225 expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
229 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
231 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
232 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
233 Assert.assertNotNull(cursor);
234 final YangInstanceIdentifier nameId =
235 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
236 cursor.write(nameId.getLastPathArgument(),
237 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
238 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
241 LOG.warn("Got to pre submit");
247 public void testMultipleShardRegistrations() throws Exception {
248 initEmptyDatastores("config");
250 final DistributedShardRegistration reg1 = leaderShardFactory
251 .createDistributedShard(TEST_ID,
252 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
254 final DistributedShardRegistration reg2 = leaderShardFactory
255 .createDistributedShard(
256 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
257 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
259 final DistributedShardRegistration reg3 = leaderShardFactory
260 .createDistributedShard(
261 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
262 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
264 final DistributedShardRegistration reg4 = leaderShardFactory
265 .createDistributedShard(
266 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
267 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
269 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
270 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
271 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
272 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
273 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
274 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
275 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
276 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
278 // check leader has local shards
279 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
280 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
282 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
283 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
285 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
286 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
288 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
289 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
291 // check follower has local shards
292 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
293 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
295 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
296 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
298 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
299 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
301 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
302 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
305 LOG.debug("Closing registrations");
312 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
313 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
315 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
316 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
318 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
319 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
321 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
322 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
324 LOG.debug("All leader shards gone");
326 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
327 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
329 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
330 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
332 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
333 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
335 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
336 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
338 LOG.debug("All follower shards gone");
342 public void testMultipleRegistrationsAtOnePrefix() throws Exception {
343 initEmptyDatastores("config");
345 for (int i = 0; i < 10; i++) {
346 LOG.debug("Round {}", i);
347 final DistributedShardRegistration reg1 = leaderShardFactory
348 .createDistributedShard(TEST_ID,
349 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
351 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
352 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
354 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
355 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
357 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
358 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
362 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
363 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
365 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
366 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));