2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.doReturn;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
17 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.collect.Lists;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Collections;
28 import java.util.HashSet;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Before;
33 import org.junit.Ignore;
34 import org.junit.Test;
35 import org.mockito.Mockito;
36 import org.opendaylight.controller.cluster.ActorSystemProvider;
37 import org.opendaylight.controller.cluster.datastore.AbstractTest;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
39 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
40 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
41 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
42 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
43 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
44 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
45 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
46 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
47 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
48 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
56 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
62 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
64 private static final Address MEMBER_1_ADDRESS =
65 AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
67 private static final DOMDataTreeIdentifier TEST_ID =
68 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
70 private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
72 private ActorSystem leaderSystem;
73 private ActorSystem followerSystem;
76 private final Builder leaderDatastoreContextBuilder =
77 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
79 private final DatastoreContext.Builder followerDatastoreContextBuilder =
80 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
82 private DistributedDataStore leaderConfigDatastore;
83 private DistributedDataStore leaderOperDatastore;
85 private DistributedDataStore followerConfigDatastore;
86 private DistributedDataStore followerOperDatastore;
89 private IntegrationTestKit followerTestKit;
90 private IntegrationTestKit leaderTestKit;
91 private DistributedShardedDOMDataTree leaderShardFactory;
93 private DistributedShardedDOMDataTree followerShardFactory;
94 private ActorSystemProvider leaderSystemProvider;
95 private ActorSystemProvider followerSystemProvider;
99 InMemoryJournal.clear();
100 InMemorySnapshotStore.clear();
102 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
103 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
105 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
106 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
108 leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
109 doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
111 followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
112 doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
117 public void tearDown() {
118 if (leaderConfigDatastore != null) {
119 leaderConfigDatastore.close();
121 if (leaderOperDatastore != null) {
122 leaderOperDatastore.close();
125 if (followerConfigDatastore != null) {
126 followerConfigDatastore.close();
128 if (followerOperDatastore != null) {
129 followerOperDatastore.close();
132 JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
133 JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
135 InMemoryJournal.clear();
136 InMemorySnapshotStore.clear();
139 private void initEmptyDatastores() throws Exception {
140 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
142 leaderConfigDatastore = leaderTestKit.setupDistributedDataStore(
143 "config", MODULE_SHARDS_CONFIG, true,
144 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
145 leaderOperDatastore = leaderTestKit.setupDistributedDataStore(
146 "operational", MODULE_SHARDS_CONFIG, true,
147 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
149 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
151 leaderConfigDatastore);
153 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
155 followerConfigDatastore = followerTestKit.setupDistributedDataStore(
156 "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
157 followerOperDatastore = followerTestKit.setupDistributedDataStore(
158 "operational", MODULE_SHARDS_CONFIG, true,
159 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
161 followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
162 followerOperDatastore,
163 followerConfigDatastore);
165 followerTestKit.waitForMembersUp("member-1");
167 leaderShardFactory.init();
168 followerShardFactory.init();
170 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
171 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
173 leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(),
174 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
178 public void testProducerRegistrations() throws Exception {
179 LOG.info("testProducerRegistrations starting");
180 initEmptyDatastores();
182 leaderTestKit.waitForMembersUp("member-2");
184 // TODO refactor shard creation and verification to own method
185 final DistributedShardRegistration shardRegistration =
186 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
187 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
188 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
190 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
191 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
193 final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager();
195 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
196 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
198 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
199 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
201 final Set<String> peers = new HashSet<>();
202 IntegrationTestKit.verifyShardState(leaderConfigDatastore,
203 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
204 peers.addAll(onDemandShardState.getPeerAddresses().values()));
205 assertEquals(peers.size(), 1);
207 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
209 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
210 fail("Producer should be already registered on the other node");
211 } catch (final IllegalArgumentException e) {
212 assertTrue(e.getMessage().contains("is attached to producer"));
217 final DOMDataTreeProducer followerProducer =
218 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
220 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
221 fail("Producer should be already registered on the other node");
222 } catch (final IllegalArgumentException e) {
223 assertTrue(e.getMessage().contains("is attached to producer"));
226 followerProducer.close();
227 // try to create a shard on an already registered prefix on follower
229 waitOnAsyncTask(followerShardFactory.createDistributedShard(
230 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
231 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
232 fail("This prefix already should have a shard registration that was forwarded from the other node");
233 } catch (final DOMDataTreeShardingConflictException e) {
234 assertTrue(e.getMessage().contains("is already occupied by another shard"));
237 shardRegistration.close().toCompletableFuture().get();
239 LOG.info("testProducerRegistrations ending");
243 public void testWriteIntoMultipleShards() throws Exception {
244 LOG.info("testWriteIntoMultipleShards starting");
245 initEmptyDatastores();
247 leaderTestKit.waitForMembersUp("member-2");
249 LOG.debug("registering first shard");
250 final DistributedShardRegistration shardRegistration =
251 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
252 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
253 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
256 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
257 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
258 findLocalShard(followerConfigDatastore.getActorContext(),
259 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
261 final Set<String> peers = new HashSet<>();
262 IntegrationTestKit.verifyShardState(leaderConfigDatastore,
263 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
264 peers.addAll(onDemandShardState.getPeerAddresses().values()));
265 assertEquals(peers.size(), 1);
267 LOG.debug("Got after waiting for nonleader");
268 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
270 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
271 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
272 Assert.assertNotNull(cursor);
273 final YangInstanceIdentifier nameId =
274 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
275 cursor.write(nameId.getLastPathArgument(),
276 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
277 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
280 LOG.warn("Got to pre submit");
282 tx.submit().checkedGet();
284 shardRegistration.close().toCompletableFuture().get();
286 LOG.info("testWriteIntoMultipleShards ending");
290 public void testMultipleShardRegistrations() throws Exception {
291 LOG.info("testMultipleShardRegistrations starting");
292 initEmptyDatastores();
294 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
295 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
296 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
298 final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
299 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
300 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
301 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
303 final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
304 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
305 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
306 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
308 final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
309 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
310 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
311 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
313 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
314 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
315 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
316 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
317 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
318 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
319 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
320 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
322 // check leader has local shards
323 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
324 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
326 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
327 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
329 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
330 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
332 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
333 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
335 // check follower has local shards
336 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
337 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
339 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
340 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
342 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
343 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
345 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
346 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
348 LOG.debug("Closing registrations");
350 reg1.close().toCompletableFuture().get();
351 reg2.close().toCompletableFuture().get();
352 reg3.close().toCompletableFuture().get();
353 reg4.close().toCompletableFuture().get();
355 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
356 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
358 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
359 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
361 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
362 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
364 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
365 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
367 LOG.debug("All leader shards gone");
369 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
370 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
372 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
373 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
375 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
376 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
378 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
379 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
381 LOG.debug("All follower shards gone");
382 LOG.info("testMultipleShardRegistrations ending");
387 public void testMultipleRegistrationsAtOnePrefix() throws Exception {
388 LOG.info("testMultipleRegistrationsAtOnePrefix starting");
389 initEmptyDatastores();
391 for (int i = 0; i < 10; i++) {
392 LOG.debug("Round {}", i);
393 final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
394 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
395 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
397 leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
398 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
400 assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
401 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
403 assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
404 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
407 final Set<String> peers = new HashSet<>();
408 IntegrationTestKit.verifyShardState(leaderConfigDatastore,
409 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
410 peers.addAll(onDemandShardState.getPeerAddresses().values()));
411 assertEquals(peers.size(), 1);
413 waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
415 waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
416 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
418 waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
419 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
422 LOG.info("testMultipleRegistrationsAtOnePrefix ending");