2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.collect.Lists;
25 import com.typesafe.config.ConfigFactory;
26 import java.util.Collections;
27 import org.junit.After;
28 import org.junit.Assert;
29 import org.junit.Before;
30 import org.junit.Ignore;
31 import org.junit.Test;
32 import org.mockito.Mockito;
33 import org.opendaylight.controller.cluster.ActorSystemProvider;
34 import org.opendaylight.controller.cluster.datastore.AbstractTest;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
37 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
38 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
39 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
40 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
41 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
42 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
43 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
53 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 @Ignore("https://bugs.opendaylight.org/show_bug.cgi?id=8301")
58 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
60 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
62 private static final Address MEMBER_1_ADDRESS =
63 AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
65 private static final DOMDataTreeIdentifier TEST_ID =
66 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
68 private static final String MODULE_SHARDS_CONFIG = "module-shards-cars-member-1-and-2.conf";
70 private ActorSystem leaderSystem;
71 private ActorSystem followerSystem;
74 private final Builder leaderDatastoreContextBuilder =
75 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
77 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
79 private final DatastoreContext.Builder followerDatastoreContextBuilder =
80 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
82 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
84 private DistributedDataStore leaderConfigDatastore;
85 private DistributedDataStore leaderOperDatastore;
87 private DistributedDataStore followerConfigDatastore;
88 private DistributedDataStore followerOperDatastore;
91 private IntegrationTestKit followerTestKit;
92 private IntegrationTestKit leaderTestKit;
93 private DistributedShardedDOMDataTree leaderShardFactory;
95 private DistributedShardedDOMDataTree followerShardFactory;
96 private ActorSystemProvider leaderSystemProvider;
97 private ActorSystemProvider followerSystemProvider;
100 public void setUp() {
101 InMemoryJournal.clear();
102 InMemorySnapshotStore.clear();
104 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
105 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
107 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
108 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
110 leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
111 doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
113 followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
114 doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
119 public void tearDown() {
120 if (leaderConfigDatastore != null) {
121 leaderConfigDatastore.close();
123 if (leaderOperDatastore != null) {
124 leaderOperDatastore.close();
127 if (followerConfigDatastore != null) {
128 followerConfigDatastore.close();
130 if (followerOperDatastore != null) {
131 followerOperDatastore.close();
134 JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
135 JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
137 InMemoryJournal.clear();
138 InMemorySnapshotStore.clear();
141 private void initEmptyDatastores() throws Exception {
142 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
144 leaderConfigDatastore = leaderTestKit.setupDistributedDataStore(
145 "config", MODULE_SHARDS_CONFIG, true,
146 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
147 leaderOperDatastore = leaderTestKit.setupDistributedDataStore(
148 "operational", MODULE_SHARDS_CONFIG, true,
149 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
151 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
153 leaderConfigDatastore);
155 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
157 followerConfigDatastore = followerTestKit.setupDistributedDataStore(
158 "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
159 followerOperDatastore = followerTestKit.setupDistributedDataStore(
160 "operational", MODULE_SHARDS_CONFIG, true,
161 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
163 followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
164 followerOperDatastore,
165 followerConfigDatastore);
167 leaderShardFactory.init();
168 followerShardFactory.init();
170 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
171 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
173 leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(),
174 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
179 public void testProducerRegistrations() throws Exception {
180 initEmptyDatastores();
182 leaderTestKit.waitForMembersUp("member-2");
184 final DistributedShardRegistration shardRegistration =
185 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
186 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
187 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
189 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
190 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
192 final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager();
194 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
195 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
197 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
198 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
200 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
202 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
203 fail("Producer should be already registered on the other node");
204 } catch (final IllegalArgumentException e) {
205 assertTrue(e.getMessage().contains("is attached to producer"));
210 final DOMDataTreeProducer followerProducer =
211 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
213 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
214 fail("Producer should be already registered on the other node");
215 } catch (final IllegalArgumentException e) {
216 assertTrue(e.getMessage().contains("is attached to producer"));
219 followerProducer.close();
220 // try to create a shard on an already registered prefix on follower
222 waitOnAsyncTask(followerShardFactory.createDistributedShard(
223 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
224 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
225 fail("This prefix already should have a shard registration that was forwarded from the other node");
226 } catch (final DOMDataTreeShardingConflictException e) {
227 assertTrue(e.getMessage().contains("is already occupied by another shard"));
230 shardRegistration.close().toCompletableFuture().get();
234 public void testWriteIntoMultipleShards() throws Exception {
235 initEmptyDatastores();
237 leaderTestKit.waitForMembersUp("member-2");
239 LOG.debug("registering first shard");
240 final DistributedShardRegistration shardRegistration =
241 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
242 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
243 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
246 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
247 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
248 findLocalShard(followerConfigDatastore.getActorContext(),
249 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
251 LOG.debug("Got after waiting for nonleader");
252 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
254 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
255 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
256 Assert.assertNotNull(cursor);
257 final YangInstanceIdentifier nameId =
258 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
259 cursor.write(nameId.getLastPathArgument(),
260 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
261 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
264 LOG.warn("Got to pre submit");
266 tx.submit().checkedGet();
268 shardRegistration.close().toCompletableFuture().get();
272 public void testMultipleShardRegistrations() throws Exception {
273 initEmptyDatastores();
275 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
276 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
277 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
279 final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
280 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
281 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
282 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
284 final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
285 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
286 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
287 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
289 final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
290 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
291 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
292 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
294 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
295 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
296 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
297 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
298 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
299 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
300 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
301 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
303 // check leader has local shards
304 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
305 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
307 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
308 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
310 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
311 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
313 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
314 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
316 // check follower has local shards
317 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
318 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
320 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
321 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
323 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
324 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
326 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
327 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
330 LOG.debug("Closing registrations");
332 reg1.close().toCompletableFuture().get();
333 reg2.close().toCompletableFuture().get();
334 reg3.close().toCompletableFuture().get();
335 reg4.close().toCompletableFuture().get();
337 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
338 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
340 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
341 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
343 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
344 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
346 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
347 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
349 LOG.debug("All leader shards gone");
351 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
352 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
354 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
355 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
357 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
358 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
360 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
361 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
363 LOG.debug("All follower shards gone");
367 public void testMultipleRegistrationsAtOnePrefix() throws Exception {
368 initEmptyDatastores();
370 for (int i = 0; i < 10; i++) {
371 LOG.debug("Round {}", i);
372 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
373 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
374 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
376 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
377 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
379 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
380 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
382 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
383 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
385 waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
387 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
388 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
390 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
391 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));