Adjust to mdsal DOM read/exists FluentFuture change
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / IntegrationTestKit.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.fail;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent.CurrentClusterState;
18 import akka.cluster.Member;
19 import akka.cluster.MemberStatus;
20 import com.google.common.base.Stopwatch;
21 import com.google.common.collect.Sets;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import java.lang.reflect.Constructor;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Consumer;
30 import org.mockito.Mockito;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
32 import org.opendaylight.controller.cluster.datastore.config.Configuration;
33 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
34 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
35 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
36 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
37 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
40 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
41 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
42 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
43 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
44 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.Await;
52 import scala.concurrent.Future;
53 import scala.concurrent.duration.Duration;
54
55 public class IntegrationTestKit extends ShardTestKit {
56
57     private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class);
58
59     protected DatastoreContext.Builder datastoreContextBuilder;
60     protected DatastoreSnapshot restoreFromSnapshot;
61     private final int commitTimeout;
62
63     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder) {
64         this(actorSystem, datastoreContextBuilder, 7);
65     }
66
67     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder,
68             final int commitTimeout) {
69         super(actorSystem);
70         this.datastoreContextBuilder = datastoreContextBuilder;
71         this.commitTimeout = commitTimeout;
72     }
73
74     public DatastoreContext.Builder getDatastoreContextBuilder() {
75         return datastoreContextBuilder;
76     }
77
78     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
79                                                           final boolean waitUntilLeader,
80                                                           final SchemaContext schemaContext) throws Exception {
81         return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader, schemaContext);
82     }
83
84     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
85                                                           final String modulesConfig,
86                                                           final boolean waitUntilLeader,
87                                                           final SchemaContext schemaContext,
88                                                           final String... shardNames) throws Exception {
89         return (DistributedDataStore) setupAbstractDataStore(DistributedDataStore.class, typeName, moduleShardsConfig,
90                 modulesConfig, waitUntilLeader, schemaContext, shardNames);
91     }
92
93     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
94                                                     final String typeName, final String... shardNames)
95             throws Exception {
96         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", true,
97                 SchemaContextHelper.full(), shardNames);
98     }
99
100     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
101                                                     final String typeName, final boolean waitUntilLeader,
102                                                     final String... shardNames) throws Exception {
103         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", waitUntilLeader,
104                 SchemaContextHelper.full(), shardNames);
105     }
106
107     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
108                                                     final String typeName, final String moduleShardsConfig,
109                                                     final boolean waitUntilLeader, final String... shardNames)
110             throws Exception {
111         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, waitUntilLeader,
112                 SchemaContextHelper.full(), shardNames);
113     }
114
115     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
116                                                     final String typeName, final String moduleShardsConfig,
117                                                     final boolean waitUntilLeader,
118                                                     final SchemaContext schemaContext,
119                                                     final String... shardNames) throws Exception {
120         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
121                 schemaContext, shardNames);
122     }
123
124     private AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
125                                                      final String typeName, final String moduleShardsConfig,
126                                                      final String modulesConfig, final boolean waitUntilLeader,
127                                                      final SchemaContext schemaContext, final String... shardNames)
128             throws Exception {
129         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
130         final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
131
132         setDataStoreName(typeName);
133
134         final DatastoreContext datastoreContext = datastoreContextBuilder.build();
135         final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
136         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
137         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
138
139         final Constructor<? extends AbstractDataStore> constructor = implementation.getDeclaredConstructor(
140                 ActorSystem.class, ClusterWrapper.class, Configuration.class,
141                 DatastoreContextFactory.class, DatastoreSnapshot.class);
142
143         final AbstractDataStore dataStore = constructor.newInstance(getSystem(), cluster, config, mockContextFactory,
144             restoreFromSnapshot);
145
146         dataStore.onGlobalContextUpdated(schemaContext);
147
148         if (waitUntilLeader) {
149             waitUntilLeader(dataStore.getActorContext(), shardNames);
150         }
151
152         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
153         return dataStore;
154     }
155
156     private void setDataStoreName(final String typeName) {
157         if ("config".equals(typeName)) {
158             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.CONFIGURATION);
159         } else if ("operational".equals(typeName)) {
160             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.OPERATIONAL);
161         } else {
162             datastoreContextBuilder.dataStoreName(typeName);
163         }
164     }
165
166     public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
167                                                                        final SchemaContext schemaContext) {
168         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
169         final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
170
171         setDataStoreName(typeName);
172
173         final DatastoreContext datastoreContext = getDatastoreContextBuilder().build();
174
175         final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
176         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
177         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
178
179         final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
180                 configuration, mockContextFactory, restoreFromSnapshot);
181
182         dataStore.onGlobalContextUpdated(schemaContext);
183
184         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
185         return dataStore;
186     }
187
188     public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
189                                                                        final SchemaContext schemaContext,
190                                                                        final LogicalDatastoreType storeType) {
191         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
192         final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
193
194         setDataStoreName(typeName);
195
196         final DatastoreContext datastoreContext =
197                 getDatastoreContextBuilder().logicalStoreType(storeType).build();
198
199         final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
200         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
201         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
202
203         final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
204                 configuration, mockContextFactory, restoreFromSnapshot);
205
206         dataStore.onGlobalContextUpdated(schemaContext);
207
208         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
209         return dataStore;
210     }
211
212     public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) {
213         for (String shardName: shardNames) {
214             ActorRef shard = findLocalShard(actorContext, shardName);
215
216             assertNotNull("Shard was not created for " + shardName, shard);
217
218             waitUntilLeader(shard);
219         }
220     }
221
222     public void waitUntilNoLeader(final ActorContext actorContext, final String... shardNames) {
223         for (String shardName: shardNames) {
224             ActorRef shard = findLocalShard(actorContext, shardName);
225             assertNotNull("No local shard found for " + shardName, shard);
226
227             waitUntilNoLeader(shard);
228         }
229     }
230
231     public void waitForMembersUp(final String... otherMembers) {
232         Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
233         Stopwatch sw = Stopwatch.createStarted();
234         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
235             CurrentClusterState state = Cluster.get(getSystem()).state();
236             for (Member m: state.getMembers()) {
237                 if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next())
238                         && otherMembersSet.isEmpty()) {
239                     return;
240                 }
241             }
242
243             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
244         }
245
246         fail("Member(s) " + otherMembersSet + " are not Up");
247     }
248
249     public static ActorRef findLocalShard(final ActorContext actorContext, final String shardName) {
250         ActorRef shard = null;
251         for (int i = 0; i < 20 * 5 && shard == null; i++) {
252             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
253             com.google.common.base.Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
254             if (shardReply.isPresent()) {
255                 shard = shardReply.get();
256             }
257         }
258         return shard;
259     }
260
261     public static void waitUntilShardIsDown(final ActorContext actorContext, final String shardName) {
262         for (int i = 0; i < 20 * 5 ; i++) {
263             LOG.debug("Waiting for shard down {}", shardName);
264             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
265             com.google.common.base.Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
266             if (!shardReply.isPresent()) {
267                 return;
268             }
269         }
270
271         throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time");
272     }
273
274     public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
275             final ShardStatsVerifier verifier) throws Exception {
276         ActorContext actorContext = datastore.getActorContext();
277
278         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
279         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
280
281         AssertionError lastError = null;
282         Stopwatch sw = Stopwatch.createStarted();
283         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
284             ShardStats shardStats = (ShardStats)actorContext
285                     .executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
286
287             try {
288                 verifier.verify(shardStats);
289                 return;
290             } catch (AssertionError e) {
291                 lastError = e;
292                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
293             }
294         }
295
296         throw lastError;
297     }
298
299     public static void verifyShardState(final AbstractDataStore datastore, final String shardName,
300             final Consumer<OnDemandShardState> verifier) throws Exception {
301         ActorContext actorContext = datastore.getActorContext();
302
303         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
304         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
305
306         AssertionError lastError = null;
307         Stopwatch sw = Stopwatch.createStarted();
308         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
309             OnDemandShardState shardState = (OnDemandShardState)actorContext
310                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
311
312             try {
313                 verifier.accept(shardState);
314                 return;
315             } catch (AssertionError e) {
316                 lastError = e;
317                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
318             }
319         }
320
321         throw lastError;
322     }
323
324     void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath,
325             final NormalizedNode<?, ?> nodeToWrite) throws Exception {
326
327         // 1. Create a write-only Tx
328
329         DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
330         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
331
332         // 2. Write some data
333
334         writeTx.write(nodePath, nodeToWrite);
335
336         // 3. Ready the Tx for commit
337
338         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
339
340         // 4. Commit the Tx
341
342         doCommit(cohort);
343
344         // 5. Verify the data in the store
345
346         DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
347
348         Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
349         assertEquals("isPresent", true, optional.isPresent());
350         assertEquals("Data node", nodeToWrite, optional.get());
351     }
352
353     public void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
354         Boolean canCommit = cohort.canCommit().get(commitTimeout, TimeUnit.SECONDS);
355         assertEquals("canCommit", true, canCommit);
356         cohort.preCommit().get(5, TimeUnit.SECONDS);
357         cohort.commit().get(5, TimeUnit.SECONDS);
358     }
359
360     void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort)
361             throws Exception {
362         Boolean canCommit = canCommitFuture.get(commitTimeout, TimeUnit.SECONDS);
363         assertEquals("canCommit", true, canCommit);
364         cohort.preCommit().get(5, TimeUnit.SECONDS);
365         cohort.commit().get(5, TimeUnit.SECONDS);
366     }
367
368     @SuppressWarnings("checkstyle:IllegalCatch")
369     void assertExceptionOnCall(final Callable<Void> callable, final Class<? extends Exception> expType)
370             throws Exception {
371         try {
372             callable.call();
373             fail("Expected " + expType.getSimpleName());
374         } catch (Exception e) {
375             assertEquals("Exception type", expType, e.getClass());
376         }
377     }
378
379     void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
380             final Class<? extends Exception> expType) throws Exception {
381         assertExceptionOnCall(() -> {
382             txChain.newWriteOnlyTransaction();
383             return null;
384         }, expType);
385
386         assertExceptionOnCall(() -> {
387             txChain.newReadWriteTransaction();
388             return null;
389         }, expType);
390
391         assertExceptionOnCall(() -> {
392             txChain.newReadOnlyTransaction();
393             return null;
394         }, expType);
395     }
396
397     public interface ShardStatsVerifier {
398         void verify(ShardStats stats);
399     }
400 }