2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.actor.PoisonPill;
23 import akka.cluster.Cluster;
24 import akka.cluster.ddata.DistributedData;
25 import akka.testkit.JavaTestKit;
26 import com.google.common.collect.Lists;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Collections;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Ignore;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.ActorSystemProvider;
36 import org.opendaylight.controller.cluster.datastore.AbstractTest;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
40 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
47 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
48 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
49 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
50 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
58 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 @Ignore("Needs to have the configuration backend switched from distributed-data")
63 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
65 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
67 private static final Address MEMBER_1_ADDRESS =
68 AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
70 private static final DOMDataTreeIdentifier TEST_ID =
71 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
73 private ActorSystem leaderSystem;
74 private ActorSystem followerSystem;
77 private final Builder leaderDatastoreContextBuilder =
78 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
80 private final DatastoreContext.Builder followerDatastoreContextBuilder =
81 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
82 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
84 private DistributedDataStore followerDistributedDataStore;
85 private DistributedDataStore leaderDistributedDataStore;
86 private IntegrationTestKit followerTestKit;
87 private IntegrationTestKit leaderTestKit;
89 private DistributedShardedDOMDataTree leaderShardFactory;
90 private DistributedShardedDOMDataTree followerShardFactory;
92 private ActorSystemProvider leaderSystemProvider;
93 private ActorSystemProvider followerSystemProvider;
98 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
99 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
101 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
102 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
104 leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
105 doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
107 followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
108 doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
113 public void tearDown() {
114 if (followerDistributedDataStore != null) {
115 followerDistributedDataStore.close();
117 if (leaderDistributedDataStore != null) {
118 leaderDistributedDataStore.close();
121 DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
122 DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
124 JavaTestKit.shutdownActorSystem(leaderSystem);
125 JavaTestKit.shutdownActorSystem(followerSystem);
128 private void initEmptyDatastores(final String type) {
129 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
131 leaderDistributedDataStore =
132 leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
134 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
135 followerDistributedDataStore =
136 followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
138 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
139 leaderDistributedDataStore,
140 leaderDistributedDataStore);
142 followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
143 followerDistributedDataStore,
144 followerDistributedDataStore);
146 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
147 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
151 @Ignore("Needs different shard creation handling due to replicas")
152 public void testProducerRegistrations() throws Exception {
153 initEmptyDatastores("config");
155 leaderTestKit.waitForMembersUp("member-2");
157 final DistributedShardRegistration shardRegistration =
158 leaderShardFactory.createDistributedShard(
159 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
161 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
162 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
164 final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
166 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
167 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
169 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
170 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
172 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
174 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
175 fail("Producer should be already registered on the other node");
176 } catch (final IllegalArgumentException e) {
177 assertTrue(e.getMessage().contains("is attached to producer"));
182 final DOMDataTreeProducer followerProducer =
183 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
185 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
186 fail("Producer should be already registered on the other node");
187 } catch (final IllegalArgumentException e) {
188 assertTrue(e.getMessage().contains("is attached to producer"));
191 followerProducer.close();
192 // try to create a shard on an already registered prefix on follower
194 followerShardFactory.createDistributedShard(TEST_ID,
195 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
196 fail("This prefix already should have a shard registration that was forwarded from the other node");
197 } catch (final DOMDataTreeShardingConflictException e) {
198 assertTrue(e.getMessage().contains("is already occupied by shard"));
203 @Ignore("Needs different shard creation handling due to replicas")
204 public void testWriteIntoMultipleShards() throws Exception {
205 initEmptyDatastores("config");
207 leaderTestKit.waitForMembersUp("member-2");
209 LOG.warn("registering first shard");
210 final DistributedShardRegistration shardRegistration =
211 leaderShardFactory.createDistributedShard(TEST_ID,
212 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
214 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
215 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
216 findLocalShard(followerDistributedDataStore.getActorContext(),
217 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
219 LOG.warn("Got after waiting for nonleader");
220 final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
222 new JavaTestKit(leaderSystem) {
224 leaderShardManager.tell(
225 new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
226 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
228 final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
230 followerShardManager.tell(new FindLocalShard(
231 ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
232 followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
233 LOG.warn("Found follower shard");
235 leaderDistributedDataStore.getActorContext().getShardManager().tell(
236 new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
237 expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
241 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
243 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
244 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
245 Assert.assertNotNull(cursor);
246 final YangInstanceIdentifier nameId =
247 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
248 cursor.write(nameId.getLastPathArgument(),
249 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
250 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
253 LOG.warn("Got to pre submit");
259 public void testMultipleShardRegistrations() throws Exception {
260 initEmptyDatastores("config");
262 final DistributedShardRegistration reg1 = leaderShardFactory
263 .createDistributedShard(TEST_ID,
264 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
266 final DistributedShardRegistration reg2 = leaderShardFactory
267 .createDistributedShard(
268 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
269 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
271 final DistributedShardRegistration reg3 = leaderShardFactory
272 .createDistributedShard(
273 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
274 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
276 final DistributedShardRegistration reg4 = leaderShardFactory
277 .createDistributedShard(
278 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
279 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
281 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
282 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
283 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
284 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
285 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
286 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
287 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
288 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
290 // check leader has local shards
291 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
292 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
294 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
295 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
297 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
298 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
300 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
301 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
303 // check follower has local shards
304 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
305 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
307 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
308 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
310 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
311 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
313 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
314 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
317 LOG.debug("Closing registrations");
324 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
325 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
327 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
328 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
330 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
331 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
333 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
334 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
336 LOG.debug("All leader shards gone");
338 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
339 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
341 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
342 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
344 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
345 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
347 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
348 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
350 LOG.debug("All follower shards gone");
354 public void testMultipleRegistrationsAtOnePrefix() throws Exception {
355 initEmptyDatastores("config");
357 for (int i = 0; i < 10; i++) {
358 LOG.debug("Round {}", i);
359 final DistributedShardRegistration reg1 = leaderShardFactory
360 .createDistributedShard(TEST_ID,
361 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
363 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
364 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
366 assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
367 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
369 assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
370 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
374 waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
375 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
377 waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
378 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));