2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.actor.PoisonPill;
23 import akka.cluster.Cluster;
24 import akka.cluster.ddata.DistributedData;
25 import akka.testkit.JavaTestKit;
26 import com.google.common.collect.Lists;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Collections;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Ignore;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.ActorSystemProvider;
36 import org.opendaylight.controller.cluster.datastore.AbstractTest;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
40 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
47 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 @Ignore("Needs to have the configuration backend switched from distributed-data")
62 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
64 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
66 private static final Address MEMBER_1_ADDRESS =
67 AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
69 private static final DOMDataTreeIdentifier TEST_ID =
70 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
72 private ActorSystem leaderSystem;
73 private ActorSystem followerSystem;
76 private final Builder leaderDatastoreContextBuilder =
77 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
79 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
81 private final DatastoreContext.Builder followerDatastoreContextBuilder =
82 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
84 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
86 private DistributedDataStore followerDistributedDataStore;
87 private DistributedDataStore leaderDistributedDataStore;
88 private IntegrationTestKit followerTestKit;
89 private IntegrationTestKit leaderTestKit;
91 private DistributedShardedDOMDataTree leaderShardFactory;
92 private DistributedShardedDOMDataTree followerShardFactory;
94 private ActorSystemProvider leaderSystemProvider;
95 private ActorSystemProvider followerSystemProvider;
100 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
101 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
103 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
104 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
106 leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
107 doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
109 followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
110 doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
115 public void tearDown() {
116 if (followerDistributedDataStore != null) {
117 followerDistributedDataStore.close();
119 if (leaderDistributedDataStore != null) {
120 leaderDistributedDataStore.close();
123 DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
124 DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
126 JavaTestKit.shutdownActorSystem(leaderSystem);
127 JavaTestKit.shutdownActorSystem(followerSystem);
130 private void initEmptyDatastores(final String type) {
131 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
133 leaderDistributedDataStore =
134 leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
136 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
137 followerDistributedDataStore =
138 followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
140 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
141 leaderDistributedDataStore,
142 leaderDistributedDataStore);
144 followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
145 followerDistributedDataStore,
146 followerDistributedDataStore);
148 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
149 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
153 @Ignore("Needs different shard creation handling due to replicas")
154 public void testProducerRegistrations() throws Exception {
155 initEmptyDatastores("config");
157 leaderTestKit.waitForMembersUp("member-2");
159 final DistributedShardRegistration shardRegistration =
160 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
161 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
162 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
164 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
165 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
167 final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
169 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
170 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
172 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
173 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
175 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
177 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
178 fail("Producer should be already registered on the other node");
179 } catch (final IllegalArgumentException e) {
180 assertTrue(e.getMessage().contains("is attached to producer"));
185 final DOMDataTreeProducer followerProducer =
186 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
188 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
189 fail("Producer should be already registered on the other node");
190 } catch (final IllegalArgumentException e) {
191 assertTrue(e.getMessage().contains("is attached to producer"));
194 followerProducer.close();
195 // try to create a shard on an already registered prefix on follower
197 followerShardFactory.createDistributedShard(TEST_ID,
198 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
199 fail("This prefix already should have a shard registration that was forwarded from the other node");
200 } catch (final DOMDataTreeShardingConflictException e) {
201 assertTrue(e.getMessage().contains("is already occupied by shard"));
206 @Ignore("Needs different shard creation handling due to replicas")
207 public void testWriteIntoMultipleShards() throws Exception {
208 initEmptyDatastores("config");
210 leaderTestKit.waitForMembersUp("member-2");
212 LOG.warn("registering first shard");
213 final DistributedShardRegistration shardRegistration =
214 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
215 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
216 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
218 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
219 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
220 findLocalShard(followerDistributedDataStore.getActorContext(),
221 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
223 LOG.warn("Got after waiting for nonleader");
224 final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
226 new JavaTestKit(leaderSystem) {
228 leaderShardManager.tell(
229 new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
230 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
232 final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
234 followerShardManager.tell(new FindLocalShard(
235 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
236 followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
237 LOG.warn("Found follower shard");
239 leaderDistributedDataStore.getActorContext().getShardManager().tell(
240 new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
241 expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
245 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
247 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
248 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
249 Assert.assertNotNull(cursor);
250 final YangInstanceIdentifier nameId =
251 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
252 cursor.write(nameId.getLastPathArgument(),
253 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
254 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
257 LOG.warn("Got to pre submit");
263 public void testMultipleShardRegistrations() throws Exception {
264 initEmptyDatastores("config");
266 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
267 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
268 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
270 final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
271 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
272 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
273 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
275 final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
276 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
277 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
278 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
280 final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
281 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
282 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
283 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
285 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
286 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
287 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
288 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
289 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
290 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
291 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
292 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
294 // check leader has local shards
295 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
296 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
298 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
299 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
301 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
302 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
304 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
305 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
307 // check follower has local shards
308 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
309 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
311 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
312 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
314 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
315 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
317 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
318 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
321 LOG.debug("Closing registrations");
328 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
329 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
331 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
332 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
334 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
335 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
337 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
338 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
340 LOG.debug("All leader shards gone");
342 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
343 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
345 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
346 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
348 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
349 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
351 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
352 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
354 LOG.debug("All follower shards gone");
358 public void testMultipleRegistrationsAtOnePrefix() throws Exception {
359 initEmptyDatastores("config");
361 for (int i = 0; i < 10; i++) {
362 LOG.debug("Round {}", i);
363 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
364 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
365 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
367 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
368 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
370 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
371 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
373 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
374 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
376 waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
378 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
379 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
381 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
382 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));