2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.doReturn;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
17 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.javadsl.TestKit;
25 import com.google.common.collect.Lists;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Collections;
28 import java.util.HashSet;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.ActorSystemProvider;
36 import org.opendaylight.controller.cluster.datastore.AbstractTest;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
40 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
41 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
42 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
43 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
44 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
54 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
60 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
62 private static final Address MEMBER_1_ADDRESS =
63 AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
65 private static final DOMDataTreeIdentifier TEST_ID =
66 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
68 private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
70 private ActorSystem leaderSystem;
71 private ActorSystem followerSystem;
74 private final Builder leaderDatastoreContextBuilder =
75 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
77 private final DatastoreContext.Builder followerDatastoreContextBuilder =
78 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
80 private DistributedDataStore leaderConfigDatastore;
81 private DistributedDataStore leaderOperDatastore;
83 private DistributedDataStore followerConfigDatastore;
84 private DistributedDataStore followerOperDatastore;
87 private IntegrationTestKit followerTestKit;
88 private IntegrationTestKit leaderTestKit;
89 private DistributedShardedDOMDataTree leaderShardFactory;
91 private DistributedShardedDOMDataTree followerShardFactory;
92 private ActorSystemProvider leaderSystemProvider;
93 private ActorSystemProvider followerSystemProvider;
97 InMemoryJournal.clear();
98 InMemorySnapshotStore.clear();
100 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
101 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
103 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
104 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
106 leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
107 doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
109 followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
110 doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
115 public void tearDown() {
116 if (leaderConfigDatastore != null) {
117 leaderConfigDatastore.close();
119 if (leaderOperDatastore != null) {
120 leaderOperDatastore.close();
123 if (followerConfigDatastore != null) {
124 followerConfigDatastore.close();
126 if (followerOperDatastore != null) {
127 followerOperDatastore.close();
130 TestKit.shutdownActorSystem(leaderSystem, true);
131 TestKit.shutdownActorSystem(followerSystem, true);
133 InMemoryJournal.clear();
134 InMemorySnapshotStore.clear();
137 private void initEmptyDatastores() throws Exception {
138 initEmptyDatastores(MODULE_SHARDS_CONFIG);
141 private void initEmptyDatastores(final String moduleShardsConfig) throws Exception {
142 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
144 leaderConfigDatastore = leaderTestKit.setupDistributedDataStore(
145 "config", moduleShardsConfig, true,
146 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
147 leaderOperDatastore = leaderTestKit.setupDistributedDataStore(
148 "operational", moduleShardsConfig, true,
149 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
151 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
153 leaderConfigDatastore);
155 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
157 followerConfigDatastore = followerTestKit.setupDistributedDataStore(
158 "config", moduleShardsConfig, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
159 followerOperDatastore = followerTestKit.setupDistributedDataStore(
160 "operational", moduleShardsConfig, true,
161 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
163 followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
164 followerOperDatastore,
165 followerConfigDatastore);
167 followerTestKit.waitForMembersUp("member-1");
169 LOG.info("Initializing leader DistributedShardedDOMDataTree");
170 leaderShardFactory.init();
172 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
173 ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty()));
175 leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorUtils(),
176 ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty()));
178 LOG.info("Initializing follower DistributedShardedDOMDataTree");
179 followerShardFactory.init();
183 public void testProducerRegistrations() throws Exception {
184 LOG.info("testProducerRegistrations starting");
185 initEmptyDatastores();
187 leaderTestKit.waitForMembersUp("member-2");
189 // TODO refactor shard creation and verification to own method
190 final DistributedShardRegistration shardRegistration =
191 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
192 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
193 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
195 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
196 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
198 final ActorRef leaderShardManager = leaderConfigDatastore.getActorUtils().getShardManager();
200 assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
201 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
203 assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
204 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
206 final Set<String> peers = new HashSet<>();
207 IntegrationTestKit.verifyShardState(leaderConfigDatastore,
208 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
209 peers.addAll(onDemandShardState.getPeerAddresses().values()));
210 assertEquals(peers.size(), 1);
212 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
214 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
215 fail("Producer should be already registered on the other node");
216 } catch (final IllegalArgumentException e) {
217 assertTrue(e.getMessage().contains("is attached to producer"));
222 final DOMDataTreeProducer followerProducer =
223 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
225 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
226 fail("Producer should be already registered on the other node");
227 } catch (final IllegalArgumentException e) {
228 assertTrue(e.getMessage().contains("is attached to producer"));
231 followerProducer.close();
232 // try to create a shard on an already registered prefix on follower
234 waitOnAsyncTask(followerShardFactory.createDistributedShard(
235 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
236 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
237 fail("This prefix already should have a shard registration that was forwarded from the other node");
238 } catch (final DOMDataTreeShardingConflictException e) {
239 assertTrue(e.getMessage().contains("is already occupied by another shard"));
242 shardRegistration.close().toCompletableFuture().get();
244 LOG.info("testProducerRegistrations ending");
248 public void testWriteIntoMultipleShards() throws Exception {
249 LOG.info("testWriteIntoMultipleShards starting");
250 initEmptyDatastores();
252 leaderTestKit.waitForMembersUp("member-2");
254 LOG.debug("registering first shard");
255 final DistributedShardRegistration shardRegistration =
256 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
257 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
258 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
261 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
262 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
263 findLocalShard(followerConfigDatastore.getActorUtils(),
264 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
266 final Set<String> peers = new HashSet<>();
267 IntegrationTestKit.verifyShardState(leaderConfigDatastore,
268 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
269 peers.addAll(onDemandShardState.getPeerAddresses().values()));
270 assertEquals(peers.size(), 1);
272 LOG.debug("Got after waiting for nonleader");
273 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
275 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
276 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
277 Assert.assertNotNull(cursor);
278 final YangInstanceIdentifier nameId =
279 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
280 cursor.write(nameId.getLastPathArgument(),
281 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
282 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
285 LOG.warn("Got to pre submit");
289 shardRegistration.close().toCompletableFuture().get();
291 LOG.info("testWriteIntoMultipleShards ending");
295 public void testMultipleShardRegistrations() throws Exception {
296 LOG.info("testMultipleShardRegistrations starting");
297 initEmptyDatastores();
299 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
300 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
301 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
303 final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
304 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
305 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
306 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
308 final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
309 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
310 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
311 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
313 final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
314 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
315 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
316 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
318 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
319 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
320 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
321 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
322 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
323 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
324 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
325 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
327 // check leader has local shards
328 assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
329 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
331 assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
332 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
334 assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
335 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
337 assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
338 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
340 // check follower has local shards
341 assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
342 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
344 assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
345 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
347 assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
348 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
350 assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
351 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
353 LOG.debug("Closing registrations");
355 reg1.close().toCompletableFuture().get();
356 reg2.close().toCompletableFuture().get();
357 reg3.close().toCompletableFuture().get();
358 reg4.close().toCompletableFuture().get();
360 waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
361 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
363 waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
364 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
366 waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
367 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
369 waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
370 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
372 LOG.debug("All leader shards gone");
374 waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
375 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
377 waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
378 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
380 waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
381 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
383 waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
384 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
386 LOG.debug("All follower shards gone");
387 LOG.info("testMultipleShardRegistrations ending");
391 public void testMultipleRegistrationsAtOnePrefix() throws Exception {
392 LOG.info("testMultipleRegistrationsAtOnePrefix starting");
393 initEmptyDatastores();
395 for (int i = 0; i < 5; i++) {
396 LOG.info("Round {}", i);
397 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
398 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
399 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
401 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
402 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
404 assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
405 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
407 assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
408 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
411 final Set<String> peers = new HashSet<>();
412 IntegrationTestKit.verifyShardState(leaderConfigDatastore,
413 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
414 peers.addAll(onDemandShardState.getPeerAddresses().values()));
415 assertEquals(peers.size(), 1);
417 waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
419 waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
420 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
422 waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
423 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
426 LOG.info("testMultipleRegistrationsAtOnePrefix ending");
430 public void testInitialBootstrappingWithNoModuleShards() throws Exception {
431 LOG.info("testInitialBootstrappingWithNoModuleShards starting");
432 initEmptyDatastores("module-shards-default-member-1.conf");
434 // We just verify the DistributedShardedDOMDataTree initialized without error.