2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.AddressFromURIString;
18 import akka.cluster.Cluster;
19 import akka.testkit.JavaTestKit;
20 import com.google.common.collect.Lists;
21 import com.typesafe.config.ConfigFactory;
22 import java.util.Collections;
23 import org.junit.After;
24 import org.junit.Assert;
25 import org.junit.Before;
26 import org.junit.Ignore;
27 import org.junit.Test;
28 import org.mockito.Mockito;
29 import org.opendaylight.controller.cluster.datastore.AbstractTest;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
32 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
33 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
34 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
35 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
36 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
37 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
38 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
39 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
40 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
41 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
42 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
43 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
44 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
50 import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
53 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
55 public class DistributedShardedDOMDataTreeTest extends AbstractTest {
57 private static final Address MEMBER_1_ADDRESS =
58 AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
60 private static final DOMDataTreeIdentifier TEST_ID =
61 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
63 private ShardedDOMDataTree shardedDOMDataTree = new ShardedDOMDataTree();
65 private ActorSystem leaderSystem;
66 private ActorSystem followerSystem;
69 private final Builder leaderDatastoreContextBuilder =
70 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
72 private final DatastoreContext.Builder followerDatastoreContextBuilder =
73 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
74 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
76 private DistributedDataStore followerDistributedDataStore;
77 private DistributedDataStore leaderDistributedDataStore;
78 private IntegrationTestKit followerTestKit;
79 private IntegrationTestKit leaderTestKit;
81 private DistributedShardedDOMDataTree leaderShardFactory;
82 private DistributedShardedDOMDataTree followerShardFactory;
86 shardedDOMDataTree = new ShardedDOMDataTree();
88 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
89 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
91 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
92 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
96 public void tearDown() {
97 if (followerDistributedDataStore != null) {
98 leaderDistributedDataStore.close();
100 if (leaderDistributedDataStore != null) {
101 leaderDistributedDataStore.close();
104 JavaTestKit.shutdownActorSystem(leaderSystem);
105 JavaTestKit.shutdownActorSystem(followerSystem);
108 private void initEmptyDatastore(final String type) {
109 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
111 leaderDistributedDataStore =
112 leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
114 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
115 followerDistributedDataStore =
116 followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
118 leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
119 Mockito.mock(DistributedDataStore.class),
120 leaderDistributedDataStore);
122 followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
123 Mockito.mock(DistributedDataStore.class),
124 followerDistributedDataStore);
128 public void testProducerRegistrations() throws Exception {
129 initEmptyDatastore("config");
131 final DistributedShardRegistration shardRegistration =
132 leaderShardFactory.createDistributedShard(TEST_ID,
133 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
135 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
136 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
138 final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
140 leaderShardManager.tell(
141 new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
142 leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalShardFound.class);
144 IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(),
145 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
147 leaderShardManager.tell(
148 new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
149 leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalPrimaryShardFound.class);
151 final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
152 followerShardManager.tell(
153 new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
154 followerTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), RemotePrimaryShardFound.class);
156 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
158 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
159 fail("Producer should be already registered on the other node");
160 } catch (final IllegalArgumentException e) {
161 assertTrue(e.getMessage().contains("is attached to producer"));
166 final DOMDataTreeProducer followerProducer =
167 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
169 leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
170 fail("Producer should be already registered on the other node");
171 } catch (final IllegalArgumentException e) {
172 assertTrue(e.getMessage().contains("is attached to producer"));
175 followerProducer.close();
176 // try to create a shard on an already registered prefix on follower
178 followerShardFactory.createDistributedShard(TEST_ID,
179 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
180 fail("This prefix already should have a shard registration that was forwarded from the other node");
181 } catch (final DOMDataTreeShardingConflictException e) {
182 assertTrue(e.getMessage().contains("is already occupied by shard"));
187 @Ignore("Needs some other stuff related to 5280")
188 public void testWriteIntoMultipleShards() throws Exception {
189 initEmptyDatastore("config");
191 final DistributedShardRegistration shardRegistration =
192 leaderShardFactory.createDistributedShard(
193 TEST_ID,Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
195 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
196 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
198 final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
200 new JavaTestKit(leaderSystem) {
202 leaderShardManager.tell(
203 new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
204 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
206 final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
208 followerShardManager.tell(
209 new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
210 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
212 leaderDistributedDataStore.getActorContext().getShardManager().tell(
213 new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
214 expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
218 final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
220 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
221 final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
222 Assert.assertNotNull(cursor);
223 final YangInstanceIdentifier nameId =
224 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
225 cursor.write(nameId.getLastPathArgument(),
226 ImmutableLeafNodeBuilder.<String>create()
227 .withNodeIdentifier(new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());