2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.actor.PoisonPill;
23 import akka.cluster.Cluster;
24 import akka.cluster.ddata.DistributedData;
25 import akka.testkit.JavaTestKit;
26 import com.google.common.collect.Lists;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Collections;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Ignore;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.ActorSystemProvider;
36 import org.opendaylight.controller.cluster.datastore.AbstractTest;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
40 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
41 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
42 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
43 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
53 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 @Ignore("Needs to have the configuration backend switched from distributed-data")
58 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
60 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
62 private static final Address MEMBER_1_ADDRESS =
63 AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
65 private static final DOMDataTreeIdentifier TEST_ID =
66 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
68 private ActorSystem leaderSystem;
69 private ActorSystem followerSystem;
72 private final Builder leaderDatastoreContextBuilder =
73 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
75 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
77 private final DatastoreContext.Builder followerDatastoreContextBuilder =
78 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
80 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
82 private DistributedDataStore followerDistributedDataStore;
83 private DistributedDataStore leaderDistributedDataStore;
84 private IntegrationTestKit followerTestKit;
85 private IntegrationTestKit leaderTestKit;
87 private DistributedShardedDOMDataTree leaderShardFactory;
88 private DistributedShardedDOMDataTree followerShardFactory;
90 private ActorSystemProvider leaderSystemProvider;
91 private ActorSystemProvider followerSystemProvider;
96 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
97 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
99 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
100 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
102 leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
103 doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
105 followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
106 doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
111 public void tearDown() {
112 if (followerDistributedDataStore != null) {
113 followerDistributedDataStore.close();
115 if (leaderDistributedDataStore != null) {
116 leaderDistributedDataStore.close();
119 DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
120 DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
122 JavaTestKit.shutdownActorSystem(leaderSystem);
123 JavaTestKit.shutdownActorSystem(followerSystem);
126 private void initEmptyDatastores(final String type) {
127 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
129 leaderDistributedDataStore =
130 leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
132 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
133 followerDistributedDataStore =
134 followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
136 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
137 leaderDistributedDataStore,
138 leaderDistributedDataStore);
140 followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
141 followerDistributedDataStore,
142 followerDistributedDataStore);
144 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
145 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
149 public void testProducerRegistrations() throws Exception {
150 initEmptyDatastores("config");
152 leaderTestKit.waitForMembersUp("member-2");
154 final DistributedShardRegistration shardRegistration =
155 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
156 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
157 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
159 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
160 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
162 final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
164 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
165 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
167 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
168 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
170 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
172 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
173 fail("Producer should be already registered on the other node");
174 } catch (final IllegalArgumentException e) {
175 assertTrue(e.getMessage().contains("is attached to producer"));
180 final DOMDataTreeProducer followerProducer =
181 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
183 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
184 fail("Producer should be already registered on the other node");
185 } catch (final IllegalArgumentException e) {
186 assertTrue(e.getMessage().contains("is attached to producer"));
189 followerProducer.close();
190 // try to create a shard on an already registered prefix on follower
192 waitOnAsyncTask(followerShardFactory.createDistributedShard(
193 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
194 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
195 fail("This prefix already should have a shard registration that was forwarded from the other node");
196 } catch (final DOMDataTreeShardingConflictException e) {
197 assertTrue(e.getMessage().contains("is already occupied by another shard"));
202 public void testWriteIntoMultipleShards() throws Exception {
203 initEmptyDatastores("config");
205 leaderTestKit.waitForMembersUp("member-2");
207 LOG.debug("registering first shard");
208 final DistributedShardRegistration shardRegistration =
209 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
210 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
211 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
214 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
215 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
216 findLocalShard(followerDistributedDataStore.getActorContext(),
217 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
219 LOG.debug("Got after waiting for nonleader");
220 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
222 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
223 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
224 Assert.assertNotNull(cursor);
225 final YangInstanceIdentifier nameId =
226 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
227 cursor.write(nameId.getLastPathArgument(),
228 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
229 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
232 LOG.warn("Got to pre submit");
234 tx.submit().checkedGet();
238 public void testMultipleShardRegistrations() throws Exception {
239 initEmptyDatastores("config");
241 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
242 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
243 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
245 final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
246 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
247 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
248 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
250 final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
251 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
252 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
253 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
255 final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
256 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
257 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
258 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
260 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
261 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
262 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
263 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
264 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
265 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
266 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
267 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
269 // check leader has local shards
270 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
271 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
273 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
274 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
276 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
277 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
279 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
280 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
282 // check follower has local shards
283 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
284 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
286 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
287 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
289 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
290 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
292 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
293 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
296 LOG.debug("Closing registrations");
303 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
304 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
306 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
307 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
309 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
310 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
312 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
313 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
315 LOG.debug("All leader shards gone");
317 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
318 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
320 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
321 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
323 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
324 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
326 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
327 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
329 LOG.debug("All follower shards gone");
333 public void testMultipleRegistrationsAtOnePrefix() throws Exception {
334 initEmptyDatastores("config");
336 for (int i = 0; i < 10; i++) {
337 LOG.debug("Round {}", i);
338 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
339 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
340 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
342 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
343 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
345 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
346 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
348 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
349 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
351 waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
353 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
354 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
356 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
357 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));