2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.collect.Lists;
25 import com.typesafe.config.ConfigFactory;
26 import java.util.Collections;
27 import org.junit.After;
28 import org.junit.Assert;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.Mockito;
32 import org.opendaylight.controller.cluster.ActorSystemProvider;
33 import org.opendaylight.controller.cluster.datastore.AbstractTest;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
37 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
38 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
39 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
40 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
41 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
42 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
43 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
44 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
52 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
56 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
58 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
60 private static final Address MEMBER_1_ADDRESS =
61 AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
63 private static final DOMDataTreeIdentifier TEST_ID =
64 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
66 private static final String MODULE_SHARDS_CONFIG = "module-shards-cars-member-1-and-2.conf";
68 private ActorSystem leaderSystem;
69 private ActorSystem followerSystem;
72 private final Builder leaderDatastoreContextBuilder =
73 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
75 private final DatastoreContext.Builder followerDatastoreContextBuilder =
76 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
78 private DistributedDataStore leaderConfigDatastore;
79 private DistributedDataStore leaderOperDatastore;
81 private DistributedDataStore followerConfigDatastore;
82 private DistributedDataStore followerOperDatastore;
85 private IntegrationTestKit followerTestKit;
86 private IntegrationTestKit leaderTestKit;
87 private DistributedShardedDOMDataTree leaderShardFactory;
89 private DistributedShardedDOMDataTree followerShardFactory;
90 private ActorSystemProvider leaderSystemProvider;
91 private ActorSystemProvider followerSystemProvider;
95 InMemoryJournal.clear();
96 InMemorySnapshotStore.clear();
98 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
99 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
101 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
102 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
104 leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
105 doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
107 followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
108 doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
113 public void tearDown() {
114 if (leaderConfigDatastore != null) {
115 leaderConfigDatastore.close();
117 if (leaderOperDatastore != null) {
118 leaderOperDatastore.close();
121 if (followerConfigDatastore != null) {
122 followerConfigDatastore.close();
124 if (followerOperDatastore != null) {
125 followerOperDatastore.close();
128 JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
129 JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
131 InMemoryJournal.clear();
132 InMemorySnapshotStore.clear();
135 private void initEmptyDatastores() throws Exception {
136 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
138 leaderConfigDatastore = leaderTestKit.setupDistributedDataStore(
139 "config", MODULE_SHARDS_CONFIG, true,
140 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
141 leaderOperDatastore = leaderTestKit.setupDistributedDataStore(
142 "operational", MODULE_SHARDS_CONFIG, true,
143 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
145 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
147 leaderConfigDatastore);
149 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
151 followerConfigDatastore = followerTestKit.setupDistributedDataStore(
152 "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
153 followerOperDatastore = followerTestKit.setupDistributedDataStore(
154 "operational", MODULE_SHARDS_CONFIG, true,
155 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
157 followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
158 followerOperDatastore,
159 followerConfigDatastore);
161 followerTestKit.waitForMembersUp("member-1");
163 leaderShardFactory.init();
164 followerShardFactory.init();
166 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
167 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
169 leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(),
170 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
175 public void testProducerRegistrations() throws Exception {
176 initEmptyDatastores();
178 leaderTestKit.waitForMembersUp("member-2");
180 final DistributedShardRegistration shardRegistration =
181 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
182 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
183 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
185 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
186 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
188 final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager();
190 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
191 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
193 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
194 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
196 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
198 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
199 fail("Producer should be already registered on the other node");
200 } catch (final IllegalArgumentException e) {
201 assertTrue(e.getMessage().contains("is attached to producer"));
206 final DOMDataTreeProducer followerProducer =
207 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
209 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
210 fail("Producer should be already registered on the other node");
211 } catch (final IllegalArgumentException e) {
212 assertTrue(e.getMessage().contains("is attached to producer"));
215 followerProducer.close();
216 // try to create a shard on an already registered prefix on follower
218 waitOnAsyncTask(followerShardFactory.createDistributedShard(
219 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
220 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
221 fail("This prefix already should have a shard registration that was forwarded from the other node");
222 } catch (final DOMDataTreeShardingConflictException e) {
223 assertTrue(e.getMessage().contains("is already occupied by another shard"));
226 shardRegistration.close().toCompletableFuture().get();
230 public void testWriteIntoMultipleShards() throws Exception {
231 initEmptyDatastores();
233 leaderTestKit.waitForMembersUp("member-2");
235 LOG.debug("registering first shard");
236 final DistributedShardRegistration shardRegistration =
237 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
238 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
239 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
242 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
243 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
244 findLocalShard(followerConfigDatastore.getActorContext(),
245 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
247 LOG.debug("Got after waiting for nonleader");
248 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
250 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
251 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
252 Assert.assertNotNull(cursor);
253 final YangInstanceIdentifier nameId =
254 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
255 cursor.write(nameId.getLastPathArgument(),
256 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
257 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
260 LOG.warn("Got to pre submit");
262 tx.submit().checkedGet();
264 shardRegistration.close().toCompletableFuture().get();
268 public void testMultipleShardRegistrations() throws Exception {
269 initEmptyDatastores();
271 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
272 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
273 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
275 final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
276 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
277 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
278 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
280 final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
281 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
282 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
283 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
285 final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
286 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
287 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
288 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
290 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
291 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
292 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
293 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
294 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
295 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
296 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
297 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
299 // check leader has local shards
300 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
301 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
303 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
304 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
306 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
307 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
309 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
310 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
312 // check follower has local shards
313 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
314 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
316 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
317 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
319 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
320 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
322 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
323 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
326 LOG.debug("Closing registrations");
328 reg1.close().toCompletableFuture().get();
329 reg2.close().toCompletableFuture().get();
330 reg3.close().toCompletableFuture().get();
331 reg4.close().toCompletableFuture().get();
333 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
334 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
336 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
337 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
339 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
340 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
342 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
343 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
345 LOG.debug("All leader shards gone");
347 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
348 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
350 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
351 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
353 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
354 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
356 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
357 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
359 LOG.debug("All follower shards gone");
363 public void testMultipleRegistrationsAtOnePrefix() throws Exception {
364 initEmptyDatastores();
366 for (int i = 0; i < 10; i++) {
367 LOG.debug("Round {}", i);
368 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
369 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
370 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
372 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
373 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
375 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
376 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
378 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
379 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
381 waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
383 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
384 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
386 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
387 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));