e827cdcaf94659d6006207e85385c9059bdac3e7
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / IntegrationTestKit.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSystem;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent.CurrentClusterState;
19 import akka.cluster.Member;
20 import akka.cluster.MemberStatus;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.lang.reflect.Constructor;
26 import java.util.Optional;
27 import java.util.Set;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.TimeUnit;
30 import java.util.function.Consumer;
31 import org.mockito.Mockito;
32 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
36 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
37 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
38 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
39 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
41 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
42 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
43 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
44 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
46 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
47 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.Await;
54 import scala.concurrent.Future;
55 import scala.concurrent.duration.FiniteDuration;
56
57 public class IntegrationTestKit extends ShardTestKit {
58
59     private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class);
60
61     protected DatastoreContext.Builder datastoreContextBuilder;
62     protected DatastoreSnapshot restoreFromSnapshot;
63     private final int commitTimeout;
64
65     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder) {
66         this(actorSystem, datastoreContextBuilder, 7);
67     }
68
69     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder,
70             final int commitTimeout) {
71         super(actorSystem);
72         this.datastoreContextBuilder = datastoreContextBuilder;
73         this.commitTimeout = commitTimeout;
74     }
75
76     public DatastoreContext.Builder getDatastoreContextBuilder() {
77         return datastoreContextBuilder;
78     }
79
80     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
81                                                           final boolean waitUntilLeader,
82                                                           final EffectiveModelContext schemaContext) throws Exception {
83         return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader, schemaContext);
84     }
85
86     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
87                                                           final String modulesConfig,
88                                                           final boolean waitUntilLeader,
89                                                           final EffectiveModelContext schemaContext,
90                                                           final String... shardNames) throws Exception {
91         return (DistributedDataStore) setupAbstractDataStore(DistributedDataStore.class, typeName, moduleShardsConfig,
92                 modulesConfig, waitUntilLeader, schemaContext, shardNames);
93     }
94
95     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
96                                                     final String typeName, final String... shardNames)
97             throws Exception {
98         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", true,
99                 SchemaContextHelper.full(), shardNames);
100     }
101
102     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
103                                                     final String typeName, final boolean waitUntilLeader,
104                                                     final String... shardNames) throws Exception {
105         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", waitUntilLeader,
106                 SchemaContextHelper.full(), shardNames);
107     }
108
109     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
110                                                     final String typeName, final String moduleShardsConfig,
111                                                     final boolean waitUntilLeader, final String... shardNames)
112             throws Exception {
113         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, waitUntilLeader,
114                 SchemaContextHelper.full(), shardNames);
115     }
116
117     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
118                                                     final String typeName, final String moduleShardsConfig,
119                                                     final boolean waitUntilLeader,
120                                                     final EffectiveModelContext schemaContext,
121                                                     final String... shardNames) throws Exception {
122         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
123                 schemaContext, shardNames);
124     }
125
126     private AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
127                                                      final String typeName, final String moduleShardsConfig,
128                                                      final String modulesConfig, final boolean waitUntilLeader,
129                                                      final EffectiveModelContext schemaContext,
130                                                      final String... shardNames)
131             throws Exception {
132         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
133         final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
134
135         setDataStoreName(typeName);
136
137         // Make sure we set up datastore context correctly
138         datastoreContextBuilder.useTellBasedProtocol(ClientBackedDataStore.class.isAssignableFrom(implementation));
139
140         final DatastoreContext datastoreContext = datastoreContextBuilder.build();
141         final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
142         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
143         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
144
145         final Constructor<? extends AbstractDataStore> constructor = implementation.getDeclaredConstructor(
146                 ActorSystem.class, ClusterWrapper.class, Configuration.class,
147                 DatastoreContextFactory.class, DatastoreSnapshot.class);
148
149         final AbstractDataStore dataStore = constructor.newInstance(getSystem(), cluster, config, mockContextFactory,
150             restoreFromSnapshot);
151
152         dataStore.onModelContextUpdated(schemaContext);
153
154         if (waitUntilLeader) {
155             waitUntilLeader(dataStore.getActorUtils(), shardNames);
156         }
157
158         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
159         return dataStore;
160     }
161
162     private void setDataStoreName(final String typeName) {
163         if ("config".equals(typeName)) {
164             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.CONFIGURATION);
165         } else if ("operational".equals(typeName)) {
166             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.OPERATIONAL);
167         } else {
168             datastoreContextBuilder.dataStoreName(typeName);
169         }
170     }
171
172     public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
173                                                                        final EffectiveModelContext schemaContext) {
174         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
175         final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
176
177         setDataStoreName(typeName);
178
179         final DatastoreContext datastoreContext = getDatastoreContextBuilder().build();
180
181         final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
182         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
183         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
184
185         final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
186                 configuration, mockContextFactory, restoreFromSnapshot);
187
188         dataStore.onModelContextUpdated(schemaContext);
189
190         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
191         return dataStore;
192     }
193
194     public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
195                                                                        final EffectiveModelContext schemaContext,
196                                                                        final LogicalDatastoreType storeType) {
197         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
198         final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
199
200         setDataStoreName(typeName);
201
202         final DatastoreContext datastoreContext =
203                 getDatastoreContextBuilder().logicalStoreType(storeType).build();
204
205         final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
206         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
207         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
208
209         final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
210                 configuration, mockContextFactory, restoreFromSnapshot);
211
212         dataStore.onModelContextUpdated(schemaContext);
213
214         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
215         return dataStore;
216     }
217
218     public void waitUntilLeader(final ActorUtils actorUtils, final String... shardNames) {
219         for (String shardName: shardNames) {
220             ActorRef shard = findLocalShard(actorUtils, shardName);
221
222             assertNotNull("Shard was not created for " + shardName, shard);
223
224             waitUntilLeader(shard);
225         }
226     }
227
228     public void waitUntilNoLeader(final ActorUtils actorUtils, final String... shardNames) {
229         for (String shardName: shardNames) {
230             ActorRef shard = findLocalShard(actorUtils, shardName);
231             assertNotNull("No local shard found for " + shardName, shard);
232
233             waitUntilNoLeader(shard);
234         }
235     }
236
237     public void waitForMembersUp(final String... otherMembers) {
238         Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
239         Stopwatch sw = Stopwatch.createStarted();
240         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
241             CurrentClusterState state = Cluster.get(getSystem()).state();
242             for (Member m: state.getMembers()) {
243                 if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next())
244                         && otherMembersSet.isEmpty()) {
245                     return;
246                 }
247             }
248
249             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
250         }
251
252         fail("Member(s) " + otherMembersSet + " are not Up");
253     }
254
255     public static ActorRef findLocalShard(final ActorUtils actorUtils, final String shardName) {
256         ActorRef shard = null;
257         for (int i = 0; i < 20 * 5 && shard == null; i++) {
258             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
259             Optional<ActorRef> shardReply = actorUtils.findLocalShard(shardName);
260             if (shardReply.isPresent()) {
261                 shard = shardReply.get();
262             }
263         }
264         return shard;
265     }
266
267     public static void waitUntilShardIsDown(final ActorUtils actorUtils, final String shardName) {
268         for (int i = 0; i < 20 * 5 ; i++) {
269             LOG.debug("Waiting for shard down {}", shardName);
270             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
271             Optional<ActorRef> shardReply = actorUtils.findLocalShard(shardName);
272             if (!shardReply.isPresent()) {
273                 return;
274             }
275         }
276
277         throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time");
278     }
279
280     public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
281             final ShardStatsVerifier verifier) throws Exception {
282         ActorUtils actorUtils = datastore.getActorUtils();
283
284         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
285         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
286
287         AssertionError lastError = null;
288         Stopwatch sw = Stopwatch.createStarted();
289         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
290             ShardStats shardStats = (ShardStats)actorUtils
291                     .executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
292
293             try {
294                 verifier.verify(shardStats);
295                 return;
296             } catch (AssertionError e) {
297                 lastError = e;
298                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
299             }
300         }
301
302         throw lastError;
303     }
304
305     public static void verifyShardState(final AbstractDataStore datastore, final String shardName,
306             final Consumer<OnDemandShardState> verifier) throws Exception {
307         ActorUtils actorUtils = datastore.getActorUtils();
308
309         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
310         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
311
312         AssertionError lastError = null;
313         Stopwatch sw = Stopwatch.createStarted();
314         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
315             OnDemandShardState shardState = (OnDemandShardState)actorUtils
316                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
317
318             try {
319                 verifier.accept(shardState);
320                 return;
321             } catch (AssertionError e) {
322                 lastError = e;
323                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
324             }
325         }
326
327         throw lastError;
328     }
329
330     void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath,
331             final NormalizedNode<?, ?> nodeToWrite) throws Exception {
332
333         // 1. Create a write-only Tx
334
335         DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
336         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
337
338         // 2. Write some data
339
340         writeTx.write(nodePath, nodeToWrite);
341
342         // 3. Ready the Tx for commit
343
344         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
345
346         // 4. Commit the Tx
347
348         doCommit(cohort);
349
350         // 5. Verify the data in the store
351
352         DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
353
354         Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
355         assertTrue("isPresent", optional.isPresent());
356         assertEquals("Data node", nodeToWrite, optional.get());
357     }
358
359     public void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
360         Boolean canCommit = cohort.canCommit().get(commitTimeout, TimeUnit.SECONDS);
361         assertEquals("canCommit", Boolean.TRUE, canCommit);
362         cohort.preCommit().get(5, TimeUnit.SECONDS);
363         cohort.commit().get(5, TimeUnit.SECONDS);
364     }
365
366     void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort)
367             throws Exception {
368         Boolean canCommit = canCommitFuture.get(commitTimeout, TimeUnit.SECONDS);
369         assertEquals("canCommit", Boolean.TRUE, canCommit);
370         cohort.preCommit().get(5, TimeUnit.SECONDS);
371         cohort.commit().get(5, TimeUnit.SECONDS);
372     }
373
374     @SuppressWarnings("checkstyle:IllegalCatch")
375     void assertExceptionOnCall(final Callable<Void> callable, final Class<? extends Exception> expType) {
376         try {
377             callable.call();
378             fail("Expected " + expType.getSimpleName());
379         } catch (Exception e) {
380             assertEquals("Exception type", expType, e.getClass());
381         }
382     }
383
384     void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
385             final Class<? extends Exception> expType) {
386         assertExceptionOnCall(() -> {
387             txChain.newWriteOnlyTransaction();
388             return null;
389         }, expType);
390
391         assertExceptionOnCall(() -> {
392             txChain.newReadWriteTransaction();
393             return null;
394         }, expType);
395
396         assertExceptionOnCall(() -> {
397             txChain.newReadOnlyTransaction();
398             return null;
399         }, expType);
400     }
401
402     public interface ShardStatsVerifier {
403         void verify(ShardStats stats);
404     }
405 }