BUG-2138: Make DistributedShardFactory return Futures.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTreeRemotingTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
17
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.actor.PoisonPill;
23 import akka.cluster.Cluster;
24 import akka.cluster.ddata.DistributedData;
25 import akka.testkit.JavaTestKit;
26 import com.google.common.collect.Lists;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Collections;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Ignore;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.ActorSystemProvider;
36 import org.opendaylight.controller.cluster.datastore.AbstractTest;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
40 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
47 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 @Ignore("Needs to have the configuration backend switched from distributed-data")
62 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
63
64     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
65
66     private static final Address MEMBER_1_ADDRESS =
67             AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
68
69     private static final DOMDataTreeIdentifier TEST_ID =
70             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
71
72     private ActorSystem leaderSystem;
73     private ActorSystem followerSystem;
74
75
76     private final Builder leaderDatastoreContextBuilder =
77             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
78                     .logicalStoreType(
79                             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
80
81     private final DatastoreContext.Builder followerDatastoreContextBuilder =
82             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
83                     .logicalStoreType(
84                             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
85
86     private DistributedDataStore followerDistributedDataStore;
87     private DistributedDataStore leaderDistributedDataStore;
88     private IntegrationTestKit followerTestKit;
89     private IntegrationTestKit leaderTestKit;
90
91     private DistributedShardedDOMDataTree leaderShardFactory;
92     private DistributedShardedDOMDataTree followerShardFactory;
93
94     private ActorSystemProvider leaderSystemProvider;
95     private ActorSystemProvider followerSystemProvider;
96
97     @Before
98     public void setUp() {
99
100         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
101         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
102
103         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
104         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
105
106         leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
107         doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
108
109         followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
110         doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
111
112     }
113
114     @After
115     public void tearDown() {
116         if (followerDistributedDataStore != null) {
117             followerDistributedDataStore.close();
118         }
119         if (leaderDistributedDataStore != null) {
120             leaderDistributedDataStore.close();
121         }
122
123         DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
124         DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
125
126         JavaTestKit.shutdownActorSystem(leaderSystem);
127         JavaTestKit.shutdownActorSystem(followerSystem);
128     }
129
130     private void initEmptyDatastores(final String type) {
131         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
132
133         leaderDistributedDataStore =
134                 leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
135
136         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
137         followerDistributedDataStore =
138                 followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
139
140         leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
141                 leaderDistributedDataStore,
142                 leaderDistributedDataStore);
143
144         followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
145                 followerDistributedDataStore,
146                 followerDistributedDataStore);
147
148         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
149                 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
150     }
151
152     @Test
153     @Ignore("Needs different shard creation handling due to replicas")
154     public void testProducerRegistrations() throws Exception {
155         initEmptyDatastores("config");
156
157         leaderTestKit.waitForMembersUp("member-2");
158
159         final DistributedShardRegistration shardRegistration =
160                 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
161                         TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
162                         DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
163
164         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
165                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
166
167         final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
168
169         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
170                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
171
172         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
173                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
174
175         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
176         try {
177             followerShardFactory.createProducer(Collections.singleton(TEST_ID));
178             fail("Producer should be already registered on the other node");
179         } catch (final IllegalArgumentException e) {
180             assertTrue(e.getMessage().contains("is attached to producer"));
181         }
182
183         producer.close();
184
185         final DOMDataTreeProducer followerProducer =
186                 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
187         try {
188             leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
189             fail("Producer should be already registered on the other node");
190         } catch (final IllegalArgumentException e) {
191             assertTrue(e.getMessage().contains("is attached to producer"));
192         }
193
194         followerProducer.close();
195         // try to create a shard on an already registered prefix on follower
196         try {
197             followerShardFactory.createDistributedShard(TEST_ID,
198                     Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
199             fail("This prefix already should have a shard registration that was forwarded from the other node");
200         } catch (final DOMDataTreeShardingConflictException e) {
201             assertTrue(e.getMessage().contains("is already occupied by shard"));
202         }
203     }
204
205     @Test
206     @Ignore("Needs different shard creation handling due to replicas")
207     public void testWriteIntoMultipleShards() throws Exception {
208         initEmptyDatastores("config");
209
210         leaderTestKit.waitForMembersUp("member-2");
211
212         LOG.warn("registering first shard");
213         final DistributedShardRegistration shardRegistration =
214                 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
215                         TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
216                         DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
217
218         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
219                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
220         findLocalShard(followerDistributedDataStore.getActorContext(),
221                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
222
223         LOG.warn("Got after waiting for nonleader");
224         final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
225
226         new JavaTestKit(leaderSystem) {
227             {
228                 leaderShardManager.tell(
229                         new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
230                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
231
232                 final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
233
234                 followerShardManager.tell(new FindLocalShard(
235                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
236                 followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
237                 LOG.warn("Found follower shard");
238
239                 leaderDistributedDataStore.getActorContext().getShardManager().tell(
240                         new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
241                 expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
242             }
243         };
244
245         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
246
247         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
248         final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
249         Assert.assertNotNull(cursor);
250         final YangInstanceIdentifier nameId =
251                 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
252         cursor.write(nameId.getLastPathArgument(),
253                 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
254                         new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
255
256         cursor.close();
257         LOG.warn("Got to pre submit");
258
259         tx.submit();
260     }
261
262     @Test
263     public void testMultipleShardRegistrations() throws Exception {
264         initEmptyDatastores("config");
265
266         final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
267                 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
268                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
269
270         final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
271                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
272                 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
273                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
274
275         final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
276                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
277                 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
278                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
279
280         final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
281                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
282                 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
283                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
284
285         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
286                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
287         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
288                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
289         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
290                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
291         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
292                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
293
294         // check leader has local shards
295         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
296                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
297
298         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
299                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
300
301         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
302                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
303
304         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
305                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
306
307         // check follower has local shards
308         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
309                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
310
311         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
312                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
313
314         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
315                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
316
317         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
318                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
319
320
321         LOG.debug("Closing registrations");
322
323         reg1.close();
324         reg2.close();
325         reg3.close();
326         reg4.close();
327
328         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
329                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
330
331         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
332                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
333
334         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
335                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
336
337         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
338                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
339
340         LOG.debug("All leader shards gone");
341
342         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
343                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
344
345         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
346                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
347
348         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
349                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
350
351         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
352                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
353
354         LOG.debug("All follower shards gone");
355     }
356
357     @Test
358     public void testMultipleRegistrationsAtOnePrefix() throws Exception {
359         initEmptyDatastores("config");
360
361         for (int i = 0; i < 10; i++) {
362             LOG.debug("Round {}", i);
363             final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
364                     TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
365                     DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
366
367             leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
368                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
369
370             assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
371                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
372
373             assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
374                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
375
376             waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
377
378             waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
379                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
380
381             waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
382                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
383         }
384     }
385 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.