BUG-2138: Listener support in shard frontend
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTreeRemotingTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
17
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.actor.PoisonPill;
23 import akka.cluster.Cluster;
24 import akka.cluster.ddata.DistributedData;
25 import akka.testkit.JavaTestKit;
26 import com.google.common.collect.Lists;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Collections;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Ignore;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.ActorSystemProvider;
36 import org.opendaylight.controller.cluster.datastore.AbstractTest;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
40 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
47 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
48 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
49 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
50 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
58 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 @Ignore("Needs to have the configuration backend switched from distributed-data")
63 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
64
65     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
66
67     private static final Address MEMBER_1_ADDRESS =
68             AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
69
70     private static final DOMDataTreeIdentifier TEST_ID =
71             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
72
73     private ActorSystem leaderSystem;
74     private ActorSystem followerSystem;
75
76
77     private final Builder leaderDatastoreContextBuilder =
78             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
79
80     private final DatastoreContext.Builder followerDatastoreContextBuilder =
81             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
82                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
83
84     private DistributedDataStore followerDistributedDataStore;
85     private DistributedDataStore leaderDistributedDataStore;
86     private IntegrationTestKit followerTestKit;
87     private IntegrationTestKit leaderTestKit;
88
89     private DistributedShardedDOMDataTree leaderShardFactory;
90     private DistributedShardedDOMDataTree followerShardFactory;
91
92     private ActorSystemProvider leaderSystemProvider;
93     private ActorSystemProvider followerSystemProvider;
94
95     @Before
96     public void setUp() {
97
98         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
99         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
100
101         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
102         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
103
104         leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
105         doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
106
107         followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
108         doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
109
110     }
111
112     @After
113     public void tearDown() {
114         if (followerDistributedDataStore != null) {
115             followerDistributedDataStore.close();
116         }
117         if (leaderDistributedDataStore != null) {
118             leaderDistributedDataStore.close();
119         }
120
121         DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
122         DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
123
124         JavaTestKit.shutdownActorSystem(leaderSystem);
125         JavaTestKit.shutdownActorSystem(followerSystem);
126     }
127
128     private void initEmptyDatastores(final String type) {
129         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
130
131         leaderDistributedDataStore =
132                 leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
133
134         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
135         followerDistributedDataStore =
136                 followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
137
138         leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
139                 leaderDistributedDataStore,
140                 leaderDistributedDataStore);
141
142         followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
143                 followerDistributedDataStore,
144                 followerDistributedDataStore);
145
146         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
147                 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
148     }
149
150     @Test
151     @Ignore("Needs different shard creation handling due to replicas")
152     public void testProducerRegistrations() throws Exception {
153         initEmptyDatastores("config");
154
155         leaderTestKit.waitForMembersUp("member-2");
156
157         final DistributedShardRegistration shardRegistration =
158                 leaderShardFactory.createDistributedShard(
159                         TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
160
161         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
162                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
163
164         final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
165
166         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
167                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
168
169         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
170                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
171
172         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
173         try {
174             followerShardFactory.createProducer(Collections.singleton(TEST_ID));
175             fail("Producer should be already registered on the other node");
176         } catch (final IllegalArgumentException e) {
177             assertTrue(e.getMessage().contains("is attached to producer"));
178         }
179
180         producer.close();
181
182         final DOMDataTreeProducer followerProducer =
183                 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
184         try {
185             leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
186             fail("Producer should be already registered on the other node");
187         } catch (final IllegalArgumentException e) {
188             assertTrue(e.getMessage().contains("is attached to producer"));
189         }
190
191         followerProducer.close();
192         // try to create a shard on an already registered prefix on follower
193         try {
194             followerShardFactory.createDistributedShard(TEST_ID,
195                     Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
196             fail("This prefix already should have a shard registration that was forwarded from the other node");
197         } catch (final DOMDataTreeShardingConflictException e) {
198             assertTrue(e.getMessage().contains("is already occupied by shard"));
199         }
200     }
201
202     @Test
203     @Ignore("Needs different shard creation handling due to replicas")
204     public void testWriteIntoMultipleShards() throws Exception {
205         initEmptyDatastores("config");
206
207         leaderTestKit.waitForMembersUp("member-2");
208
209         LOG.warn("registering first shard");
210         final DistributedShardRegistration shardRegistration =
211                 leaderShardFactory.createDistributedShard(TEST_ID,
212                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
213
214         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
215                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
216         findLocalShard(followerDistributedDataStore.getActorContext(),
217                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
218
219         LOG.warn("Got after waiting for nonleader");
220         final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
221
222         new JavaTestKit(leaderSystem) {
223             {
224                 leaderShardManager.tell(
225                         new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
226                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
227
228                 final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
229
230                 followerShardManager.tell(new FindLocalShard(
231                         ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
232                 followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
233                 LOG.warn("Found follower shard");
234
235                 leaderDistributedDataStore.getActorContext().getShardManager().tell(
236                         new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
237                 expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
238             }
239         };
240
241         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
242
243         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
244         final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
245         Assert.assertNotNull(cursor);
246         final YangInstanceIdentifier nameId =
247                 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
248         cursor.write(nameId.getLastPathArgument(),
249                 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
250                         new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
251
252         cursor.close();
253         LOG.warn("Got to pre submit");
254
255         tx.submit();
256     }
257
258     @Test
259     public void testMultipleShardRegistrations() throws Exception {
260         initEmptyDatastores("config");
261
262         final DistributedShardRegistration reg1 = leaderShardFactory
263                 .createDistributedShard(TEST_ID,
264                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
265
266         final DistributedShardRegistration reg2 = leaderShardFactory
267                 .createDistributedShard(
268                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
269                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
270
271         final DistributedShardRegistration reg3 = leaderShardFactory
272                 .createDistributedShard(
273                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
274                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
275
276         final DistributedShardRegistration reg4 = leaderShardFactory
277                 .createDistributedShard(
278                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
279                         Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
280
281         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
282                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
283         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
284                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
285         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
286                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
287         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
288                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
289
290         // check leader has local shards
291         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
292                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
293
294         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
295                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
296
297         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
298                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
299
300         assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
301                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
302
303         // check follower has local shards
304         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
305                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
306
307         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
308                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
309
310         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
311                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
312
313         assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
314                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
315
316
317         LOG.debug("Closing registrations");
318
319         reg1.close();
320         reg2.close();
321         reg3.close();
322         reg4.close();
323
324         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
325                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
326
327         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
328                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
329
330         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
331                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
332
333         waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
334                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
335
336         LOG.debug("All leader shards gone");
337
338         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
339                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
340
341         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
342                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
343
344         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
345                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
346
347         waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
348                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
349
350         LOG.debug("All follower shards gone");
351     }
352
353     @Test
354     public void testMultipleRegistrationsAtOnePrefix() throws Exception {
355         initEmptyDatastores("config");
356
357         for (int i = 0; i < 10; i++) {
358             LOG.debug("Round {}", i);
359             final DistributedShardRegistration reg1 = leaderShardFactory
360                     .createDistributedShard(TEST_ID,
361                             Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
362
363             leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
364                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
365
366             assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
367                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
368
369             assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
370                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
371
372             reg1.close();
373
374             waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
375                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
376
377             waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
378                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
379         }
380     }
381 }