BUG-2138: Create DistributedShardFrontend
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTreeRemotingTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
15 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
16
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSystem;
19 import akka.actor.Address;
20 import akka.actor.AddressFromURIString;
21 import akka.actor.PoisonPill;
22 import akka.cluster.Cluster;
23 import akka.cluster.ddata.DistributedData;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.collect.Lists;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Collections;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Before;
31 import org.junit.Ignore;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractTest;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
37 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
38 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
39 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
40 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
42 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
43 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
44 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
45 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
55 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 @Ignore("Needs to have the configuration backend switched from distributed-data")
60 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
61
62     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
63
64     private static final Address MEMBER_1_ADDRESS =
65             AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
66
67     private static final DOMDataTreeIdentifier TEST_ID =
68             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
69
70     private ActorSystem leaderSystem;
71     private ActorSystem followerSystem;
72
73
74     private final Builder leaderDatastoreContextBuilder =
75             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
76
77     private final DatastoreContext.Builder followerDatastoreContextBuilder =
78             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
79                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
80
81     private DistributedDataStore followerDistributedDataStore;
82     private DistributedDataStore leaderDistributedDataStore;
83     private IntegrationTestKit followerTestKit;
84     private IntegrationTestKit leaderTestKit;
85
86     private DistributedShardedDOMDataTree leaderShardFactory;
87     private DistributedShardedDOMDataTree followerShardFactory;
88
89     @Before
90     public void setUp() {
91
92         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
93         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
94
95         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
96         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
97
98     }
99
100     @After
101     public void tearDown() {
102         if (followerDistributedDataStore != null) {
103             followerDistributedDataStore.close();
104         }
105         if (leaderDistributedDataStore != null) {
106             leaderDistributedDataStore.close();
107         }
108
109         DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
110         DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
111
112         JavaTestKit.shutdownActorSystem(leaderSystem);
113         JavaTestKit.shutdownActorSystem(followerSystem);
114     }
115
116     private void initEmptyDatastores(final String type) {
117         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
118
119         leaderDistributedDataStore =
120                 leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
121
122         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
123         followerDistributedDataStore =
124                 followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
125
126         leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
127                 leaderDistributedDataStore,
128                 leaderDistributedDataStore);
129
130         followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
131                 followerDistributedDataStore,
132                 followerDistributedDataStore);
133
134         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
135                 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
136     }
137
138     @Test
139     @Ignore("Needs different shard creation handling due to replicas")
140     public void testProducerRegistrations() throws Exception {
141         initEmptyDatastores("config");
142
143         leaderTestKit.waitForMembersUp("member-2");
144
145         final DistributedShardRegistration shardRegistration =
146                 leaderShardFactory.createDistributedShard(
147                         TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
148
149         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
150                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
151
152         final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
153
154         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
155                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
156
157         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
158                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
159
160         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
161         try {
162             followerShardFactory.createProducer(Collections.singleton(TEST_ID));
163             fail("Producer should be already registered on the other node");
164         } catch (final IllegalArgumentException e) {
165             assertTrue(e.getMessage().contains("is attached to producer"));
166         }
167
168         producer.close();
169
170         final DOMDataTreeProducer followerProducer =
171                 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
172         try {
173             leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
174             fail("Producer should be already registered on the other node");
175         } catch (final IllegalArgumentException e) {
176             assertTrue(e.getMessage().contains("is attached to producer"));
177         }
178
179         followerProducer.close();
180         // try to create a shard on an already registered prefix on follower
181         try {
182             followerShardFactory.createDistributedShard(TEST_ID,
183                     Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
184             fail("This prefix already should have a shard registration that was forwarded from the other node");
185         } catch (final DOMDataTreeShardingConflictException e) {
186             assertTrue(e.getMessage().contains("is already occupied by shard"));
187         }
188     }
189
190     @Test
191     @Ignore("Needs different shard creation handling due to replicas")
192     public void testWriteIntoMultipleShards() throws Exception {
193         initEmptyDatastores("config");
194
195         leaderTestKit.waitForMembersUp("member-2");
196
197         LOG.warn("registering first shard");
198         final DistributedShardRegistration shardRegistration =
199                 leaderShardFactory.createDistributedShard(TEST_ID,
200                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
201
202         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
203                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
204         findLocalShard(followerDistributedDataStore.getActorContext(),
205                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
206
207         LOG.warn("Got after waiting for nonleader");
208         final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
209
210         new JavaTestKit(leaderSystem) {
211             {
212                 leaderShardManager.tell(
213                         new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
214                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
215
216                 final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
217
218                 followerShardManager.tell(new FindLocalShard(
219                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
220                 followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
221                 LOG.warn("Found follower shard");
222
223                 leaderDistributedDataStore.getActorContext().getShardManager().tell(
224                         new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
225                 expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
226             }
227         };
228
229         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
230
231         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
232         final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
233         Assert.assertNotNull(cursor);
234         final YangInstanceIdentifier nameId =
235                 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
236         cursor.write(nameId.getLastPathArgument(),
237                 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
238                         new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
239
240         cursor.close();
241         LOG.warn("Got to pre submit");
242
243         tx.submit();
244     }
245
246     @Test
247     public void testMultipleShardRegistrations() throws Exception {
248         initEmptyDatastores("config");
249
250         final DistributedShardRegistration reg1 = leaderShardFactory
251                 .createDistributedShard(TEST_ID,
252                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
253
254         final DistributedShardRegistration reg2 = leaderShardFactory
255                 .createDistributedShard(
256                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
257                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
258
259         final DistributedShardRegistration reg3 = leaderShardFactory
260                 .createDistributedShard(
261                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
262                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
263
264         final DistributedShardRegistration reg4 = leaderShardFactory
265                 .createDistributedShard(
266                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
267                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
268
269         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
270                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
271         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
272                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
273         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
274                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
275         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
276                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
277
278         // check leader has local shards
279         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
280                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
281
282         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
283                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
284
285         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
286                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
287
288         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
289                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
290
291         // check follower has local shards
292         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
293                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
294
295         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
296                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
297
298         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
299                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
300
301         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
302                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
303
304
305         LOG.debug("Closing registrations");
306
307         reg1.close();
308         reg2.close();
309         reg3.close();
310         reg4.close();
311
312         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
313                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
314
315         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
316                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
317
318         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
319                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
320
321         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
322                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
323
324         LOG.debug("All leader shards gone");
325
326         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
327                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
328
329         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
330                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
331
332         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
333                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
334
335         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
336                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
337
338         LOG.debug("All follower shards gone");
339     }
340
341     @Test
342     public void testMultipleRegistrationsAtOnePrefix() throws Exception {
343         initEmptyDatastores("config");
344
345         for (int i = 0; i < 10; i++) {
346             LOG.debug("Round {}", i);
347             final DistributedShardRegistration reg1 = leaderShardFactory
348                     .createDistributedShard(TEST_ID,
349                             Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
350
351             leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
352                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
353
354             assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
355                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
356
357             assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
358                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
359
360             reg1.close();
361
362             waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
363                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
364
365             waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
366                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
367         }
368     }
369 }