2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.doReturn;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
17 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.collect.Lists;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Collections;
28 import java.util.HashSet;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.ActorSystemProvider;
36 import org.opendaylight.controller.cluster.datastore.AbstractTest;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
40 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
41 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
42 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
43 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
44 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
45 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
55 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
61 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
63 private static final Address MEMBER_1_ADDRESS =
64 AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
66 private static final DOMDataTreeIdentifier TEST_ID =
67 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
69 private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
71 private ActorSystem leaderSystem;
72 private ActorSystem followerSystem;
75 private final Builder leaderDatastoreContextBuilder =
76 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
78 private final DatastoreContext.Builder followerDatastoreContextBuilder =
79 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
81 private DistributedDataStore leaderConfigDatastore;
82 private DistributedDataStore leaderOperDatastore;
84 private DistributedDataStore followerConfigDatastore;
85 private DistributedDataStore followerOperDatastore;
88 private IntegrationTestKit followerTestKit;
89 private IntegrationTestKit leaderTestKit;
90 private DistributedShardedDOMDataTree leaderShardFactory;
92 private DistributedShardedDOMDataTree followerShardFactory;
93 private ActorSystemProvider leaderSystemProvider;
94 private ActorSystemProvider followerSystemProvider;
98 InMemoryJournal.clear();
99 InMemorySnapshotStore.clear();
101 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
102 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
104 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
105 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
107 leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
108 doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
110 followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
111 doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
116 public void tearDown() {
117 if (leaderConfigDatastore != null) {
118 leaderConfigDatastore.close();
120 if (leaderOperDatastore != null) {
121 leaderOperDatastore.close();
124 if (followerConfigDatastore != null) {
125 followerConfigDatastore.close();
127 if (followerOperDatastore != null) {
128 followerOperDatastore.close();
131 JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
132 JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
134 InMemoryJournal.clear();
135 InMemorySnapshotStore.clear();
138 private void initEmptyDatastores() throws Exception {
139 initEmptyDatastores(MODULE_SHARDS_CONFIG);
142 private void initEmptyDatastores(String moduleShardsConfig) throws Exception {
143 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
145 leaderConfigDatastore = leaderTestKit.setupDistributedDataStore(
146 "config", moduleShardsConfig, true,
147 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
148 leaderOperDatastore = leaderTestKit.setupDistributedDataStore(
149 "operational", moduleShardsConfig, true,
150 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
152 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
154 leaderConfigDatastore);
156 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
158 followerConfigDatastore = followerTestKit.setupDistributedDataStore(
159 "config", moduleShardsConfig, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
160 followerOperDatastore = followerTestKit.setupDistributedDataStore(
161 "operational", moduleShardsConfig, true,
162 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
164 followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
165 followerOperDatastore,
166 followerConfigDatastore);
168 followerTestKit.waitForMembersUp("member-1");
170 LOG.info("Initializing leader DistributedShardedDOMDataTree");
171 leaderShardFactory.init();
173 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
174 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
176 leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(),
177 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
179 LOG.info("Initializing follower DistributedShardedDOMDataTree");
180 followerShardFactory.init();
184 public void testProducerRegistrations() throws Exception {
185 LOG.info("testProducerRegistrations starting");
186 initEmptyDatastores();
188 leaderTestKit.waitForMembersUp("member-2");
190 // TODO refactor shard creation and verification to own method
191 final DistributedShardRegistration shardRegistration =
192 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
193 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
194 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
196 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
197 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
199 final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager();
201 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
202 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
204 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
205 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
207 final Set<String> peers = new HashSet<>();
208 IntegrationTestKit.verifyShardState(leaderConfigDatastore,
209 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
210 peers.addAll(onDemandShardState.getPeerAddresses().values()));
211 assertEquals(peers.size(), 1);
213 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
215 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
216 fail("Producer should be already registered on the other node");
217 } catch (final IllegalArgumentException e) {
218 assertTrue(e.getMessage().contains("is attached to producer"));
223 final DOMDataTreeProducer followerProducer =
224 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
226 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
227 fail("Producer should be already registered on the other node");
228 } catch (final IllegalArgumentException e) {
229 assertTrue(e.getMessage().contains("is attached to producer"));
232 followerProducer.close();
233 // try to create a shard on an already registered prefix on follower
235 waitOnAsyncTask(followerShardFactory.createDistributedShard(
236 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
237 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
238 fail("This prefix already should have a shard registration that was forwarded from the other node");
239 } catch (final DOMDataTreeShardingConflictException e) {
240 assertTrue(e.getMessage().contains("is already occupied by another shard"));
243 shardRegistration.close().toCompletableFuture().get();
245 LOG.info("testProducerRegistrations ending");
249 public void testWriteIntoMultipleShards() throws Exception {
250 LOG.info("testWriteIntoMultipleShards starting");
251 initEmptyDatastores();
253 leaderTestKit.waitForMembersUp("member-2");
255 LOG.debug("registering first shard");
256 final DistributedShardRegistration shardRegistration =
257 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
258 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
259 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
262 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
263 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
264 findLocalShard(followerConfigDatastore.getActorContext(),
265 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
267 final Set<String> peers = new HashSet<>();
268 IntegrationTestKit.verifyShardState(leaderConfigDatastore,
269 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
270 peers.addAll(onDemandShardState.getPeerAddresses().values()));
271 assertEquals(peers.size(), 1);
273 LOG.debug("Got after waiting for nonleader");
274 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
276 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
277 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
278 Assert.assertNotNull(cursor);
279 final YangInstanceIdentifier nameId =
280 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
281 cursor.write(nameId.getLastPathArgument(),
282 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
283 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
286 LOG.warn("Got to pre submit");
288 tx.submit().checkedGet();
290 shardRegistration.close().toCompletableFuture().get();
292 LOG.info("testWriteIntoMultipleShards ending");
296 public void testMultipleShardRegistrations() throws Exception {
297 LOG.info("testMultipleShardRegistrations starting");
298 initEmptyDatastores();
300 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
301 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
302 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
304 final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
305 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
306 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
307 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
309 final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
310 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
311 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
312 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
314 final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
315 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
316 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
317 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
319 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
320 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
321 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
322 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
323 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
324 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
325 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
326 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
328 // check leader has local shards
329 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
330 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
332 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
333 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
335 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
336 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
338 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
339 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
341 // check follower has local shards
342 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
343 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
345 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
346 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
348 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
349 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
351 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
352 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
354 LOG.debug("Closing registrations");
356 reg1.close().toCompletableFuture().get();
357 reg2.close().toCompletableFuture().get();
358 reg3.close().toCompletableFuture().get();
359 reg4.close().toCompletableFuture().get();
361 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
362 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
364 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
365 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
367 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
368 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
370 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
371 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
373 LOG.debug("All leader shards gone");
375 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
376 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
378 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
379 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
381 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
382 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
384 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
385 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
387 LOG.debug("All follower shards gone");
388 LOG.info("testMultipleShardRegistrations ending");
392 public void testMultipleRegistrationsAtOnePrefix() throws Exception {
393 LOG.info("testMultipleRegistrationsAtOnePrefix starting");
394 initEmptyDatastores();
396 for (int i = 0; i < 5; i++) {
397 LOG.info("Round {}", i);
398 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
399 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
400 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
402 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
403 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
405 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
406 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
408 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
409 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
412 final Set<String> peers = new HashSet<>();
413 IntegrationTestKit.verifyShardState(leaderConfigDatastore,
414 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
415 peers.addAll(onDemandShardState.getPeerAddresses().values()));
416 assertEquals(peers.size(), 1);
418 waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
420 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
421 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
423 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
424 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
427 LOG.info("testMultipleRegistrationsAtOnePrefix ending");
431 public void testInitialBootstrappingWithNoModuleShards() throws Exception {
432 LOG.info("testInitialBootstrappingWithNoModuleShards starting");
433 initEmptyDatastores("module-shards-default-member-1.conf");
435 // We just verify the DistributedShardedDOMDataTree initialized without error.