Split DistributedShardRegistration into its own file
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTreeRemotingTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.doReturn;
16 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
17 import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
18
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.javadsl.TestKit;
25 import com.google.common.collect.Lists;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.Set;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.ActorSystemProvider;
36 import org.opendaylight.controller.cluster.datastore.AbstractTest;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
40 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
41 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
42 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
43 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
44 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
54 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
59
60     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
61
62     private static final Address MEMBER_1_ADDRESS =
63             AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
64
65     private static final DOMDataTreeIdentifier TEST_ID =
66             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
67
68     private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
69
70     private ActorSystem leaderSystem;
71     private ActorSystem followerSystem;
72
73
74     private final Builder leaderDatastoreContextBuilder =
75             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
76
77     private final DatastoreContext.Builder followerDatastoreContextBuilder =
78             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
79
80     private DistributedDataStore leaderConfigDatastore;
81     private DistributedDataStore leaderOperDatastore;
82
83     private DistributedDataStore followerConfigDatastore;
84     private DistributedDataStore followerOperDatastore;
85
86
87     private IntegrationTestKit followerTestKit;
88     private IntegrationTestKit leaderTestKit;
89     private DistributedShardedDOMDataTree leaderShardFactory;
90
91     private DistributedShardedDOMDataTree followerShardFactory;
92     private ActorSystemProvider leaderSystemProvider;
93     private ActorSystemProvider followerSystemProvider;
94
95     @Before
96     public void setUp() {
97         InMemoryJournal.clear();
98         InMemorySnapshotStore.clear();
99
100         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
101         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
102
103         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
104         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
105
106         leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
107         doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
108
109         followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
110         doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
111
112     }
113
114     @After
115     public void tearDown() {
116         if (leaderConfigDatastore != null) {
117             leaderConfigDatastore.close();
118         }
119         if (leaderOperDatastore != null) {
120             leaderOperDatastore.close();
121         }
122
123         if (followerConfigDatastore != null) {
124             followerConfigDatastore.close();
125         }
126         if (followerOperDatastore != null) {
127             followerOperDatastore.close();
128         }
129
130         TestKit.shutdownActorSystem(leaderSystem, true);
131         TestKit.shutdownActorSystem(followerSystem, true);
132
133         InMemoryJournal.clear();
134         InMemorySnapshotStore.clear();
135     }
136
137     private void initEmptyDatastores() throws Exception {
138         initEmptyDatastores(MODULE_SHARDS_CONFIG);
139     }
140
141     private void initEmptyDatastores(final String moduleShardsConfig) throws Exception {
142         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
143
144         leaderConfigDatastore = leaderTestKit.setupDistributedDataStore(
145                 "config", moduleShardsConfig, true,
146                 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
147         leaderOperDatastore = leaderTestKit.setupDistributedDataStore(
148                 "operational", moduleShardsConfig, true,
149                 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
150
151         leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
152                 leaderOperDatastore,
153                 leaderConfigDatastore);
154
155         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
156
157         followerConfigDatastore = followerTestKit.setupDistributedDataStore(
158                 "config", moduleShardsConfig, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
159         followerOperDatastore = followerTestKit.setupDistributedDataStore(
160                 "operational", moduleShardsConfig, true,
161                 SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
162
163         followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
164                 followerOperDatastore,
165                 followerConfigDatastore);
166
167         followerTestKit.waitForMembersUp("member-1");
168
169         LOG.info("Initializing leader DistributedShardedDOMDataTree");
170         leaderShardFactory.init();
171
172         leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
173                 ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty()));
174
175         leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorUtils(),
176                 ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty()));
177
178         LOG.info("Initializing follower DistributedShardedDOMDataTree");
179         followerShardFactory.init();
180     }
181
182     @Test
183     public void testProducerRegistrations() throws Exception {
184         LOG.info("testProducerRegistrations starting");
185         initEmptyDatastores();
186
187         leaderTestKit.waitForMembersUp("member-2");
188
189         // TODO refactor shard creation and verification to own method
190         final DistributedShardRegistration shardRegistration =
191                 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
192                         TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
193                         DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
194
195         leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
196                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
197
198         final ActorRef leaderShardManager = leaderConfigDatastore.getActorUtils().getShardManager();
199
200         assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
201                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
202
203         assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
204                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
205
206         final Set<String> peers  = new HashSet<>();
207         IntegrationTestKit.verifyShardState(leaderConfigDatastore,
208                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
209                         peers.addAll(onDemandShardState.getPeerAddresses().values()));
210         assertEquals(peers.size(), 1);
211
212         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
213         try {
214             followerShardFactory.createProducer(Collections.singleton(TEST_ID));
215             fail("Producer should be already registered on the other node");
216         } catch (final IllegalArgumentException e) {
217             assertTrue(e.getMessage().contains("is attached to producer"));
218         }
219
220         producer.close();
221
222         final DOMDataTreeProducer followerProducer =
223                 followerShardFactory.createProducer(Collections.singleton(TEST_ID));
224         try {
225             leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
226             fail("Producer should be already registered on the other node");
227         } catch (final IllegalArgumentException e) {
228             assertTrue(e.getMessage().contains("is attached to producer"));
229         }
230
231         followerProducer.close();
232         // try to create a shard on an already registered prefix on follower
233         try {
234             waitOnAsyncTask(followerShardFactory.createDistributedShard(
235                     TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
236                     DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
237             fail("This prefix already should have a shard registration that was forwarded from the other node");
238         } catch (final DOMDataTreeShardingConflictException e) {
239             assertTrue(e.getMessage().contains("is already occupied by another shard"));
240         }
241
242         shardRegistration.close().toCompletableFuture().get();
243
244         LOG.info("testProducerRegistrations ending");
245     }
246
247     @Test
248     public void testWriteIntoMultipleShards() throws Exception {
249         LOG.info("testWriteIntoMultipleShards starting");
250         initEmptyDatastores();
251
252         leaderTestKit.waitForMembersUp("member-2");
253
254         LOG.debug("registering first shard");
255         final DistributedShardRegistration shardRegistration =
256                 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
257                         TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
258                         DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
259
260
261         leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
262                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
263         findLocalShard(followerConfigDatastore.getActorUtils(),
264                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
265
266         final Set<String> peers  = new HashSet<>();
267         IntegrationTestKit.verifyShardState(leaderConfigDatastore,
268                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
269                         peers.addAll(onDemandShardState.getPeerAddresses().values()));
270         assertEquals(peers.size(), 1);
271
272         LOG.debug("Got after waiting for nonleader");
273         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
274
275         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
276         final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
277         Assert.assertNotNull(cursor);
278         final YangInstanceIdentifier nameId =
279                 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
280         cursor.write(nameId.getLastPathArgument(),
281                 ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
282                         new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
283
284         cursor.close();
285         LOG.warn("Got to pre submit");
286
287         tx.commit().get();
288
289         shardRegistration.close().toCompletableFuture().get();
290
291         LOG.info("testWriteIntoMultipleShards ending");
292     }
293
294     @Test
295     public void testMultipleShardRegistrations() throws Exception {
296         LOG.info("testMultipleShardRegistrations starting");
297         initEmptyDatastores();
298
299         final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
300                 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
301                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
302
303         final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
304                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
305                 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
306                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
307
308         final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
309                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
310                 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
311                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
312
313         final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
314                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
315                 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
316                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
317
318         leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
319                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
320         leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
321                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
322         leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
323                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
324         leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
325                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
326
327         // check leader has local shards
328         assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
329                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
330
331         assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
332                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
333
334         assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
335                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
336
337         assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
338                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
339
340         // check follower has local shards
341         assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
342                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
343
344         assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
345                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
346
347         assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
348                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
349
350         assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
351                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
352
353         LOG.debug("Closing registrations");
354
355         reg1.close().toCompletableFuture().get();
356         reg2.close().toCompletableFuture().get();
357         reg3.close().toCompletableFuture().get();
358         reg4.close().toCompletableFuture().get();
359
360         waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
361                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
362
363         waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
364                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
365
366         waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
367                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
368
369         waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
370                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
371
372         LOG.debug("All leader shards gone");
373
374         waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
375                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
376
377         waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
378                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
379
380         waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
381                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
382
383         waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
384                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
385
386         LOG.debug("All follower shards gone");
387         LOG.info("testMultipleShardRegistrations ending");
388     }
389
390     @Test
391     public void testMultipleRegistrationsAtOnePrefix() throws Exception {
392         LOG.info("testMultipleRegistrationsAtOnePrefix starting");
393         initEmptyDatastores();
394
395         for (int i = 0; i < 5; i++) {
396             LOG.info("Round {}", i);
397             final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
398                     TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
399                     DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
400
401             leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
402                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
403
404             assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
405                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
406
407             assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
408                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
409
410
411             final Set<String> peers  = new HashSet<>();
412             IntegrationTestKit.verifyShardState(leaderConfigDatastore,
413                     ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
414                             peers.addAll(onDemandShardState.getPeerAddresses().values()));
415             assertEquals(peers.size(), 1);
416
417             waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
418
419             waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
420                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
421
422             waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
423                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
424         }
425
426         LOG.info("testMultipleRegistrationsAtOnePrefix ending");
427     }
428
429     @Test
430     public void testInitialBootstrappingWithNoModuleShards() throws Exception {
431         LOG.info("testInitialBootstrappingWithNoModuleShards starting");
432         initEmptyDatastores("module-shards-default-member-1.conf");
433
434         // We just verify the DistributedShardedDOMDataTree initialized without error.
435     }
436 }