Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / IntegrationTestKit.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertThrows;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.anyString;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent.CurrentClusterState;
22 import akka.cluster.Member;
23 import akka.cluster.MemberStatus;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import java.lang.reflect.Constructor;
29 import java.util.Optional;
30 import java.util.Set;
31 import java.util.concurrent.TimeUnit;
32 import java.util.function.Consumer;
33 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
35 import org.opendaylight.controller.cluster.datastore.config.Configuration;
36 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
37 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
38 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
39 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
41 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
42 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
43 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
44 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
46 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
47 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.Await;
54 import scala.concurrent.Future;
55 import scala.concurrent.duration.FiniteDuration;
56
57 public class IntegrationTestKit extends ShardTestKit {
58
59     private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class);
60
61     protected DatastoreContext.Builder datastoreContextBuilder;
62     protected DatastoreSnapshot restoreFromSnapshot;
63     private final int commitTimeout;
64
65     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder) {
66         this(actorSystem, datastoreContextBuilder, 7);
67     }
68
69     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder,
70             final int commitTimeout) {
71         super(actorSystem);
72         this.datastoreContextBuilder = datastoreContextBuilder;
73         this.commitTimeout = commitTimeout;
74     }
75
76     public DatastoreContext.Builder getDatastoreContextBuilder() {
77         return datastoreContextBuilder;
78     }
79
80     @Deprecated(since = "7.0.0", forRemoval = true)
81     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
82                                                           final boolean waitUntilLeader,
83                                                           final EffectiveModelContext schemaContext) throws Exception {
84         return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader, schemaContext);
85     }
86
87     @Deprecated(since = "7.0.0", forRemoval = true)
88     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
89                                                           final String modulesConfig,
90                                                           final boolean waitUntilLeader,
91                                                           final EffectiveModelContext schemaContext,
92                                                           final String... shardNames) throws Exception {
93         return (DistributedDataStore) setupAbstractDataStore(DistributedDataStore.class, typeName, moduleShardsConfig,
94                 modulesConfig, waitUntilLeader, schemaContext, shardNames);
95     }
96
97     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
98                                                     final String typeName, final String... shardNames)
99             throws Exception {
100         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", true,
101                 SchemaContextHelper.full(), shardNames);
102     }
103
104     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
105                                                     final String typeName, final boolean waitUntilLeader,
106                                                     final String... shardNames) throws Exception {
107         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", waitUntilLeader,
108                 SchemaContextHelper.full(), shardNames);
109     }
110
111     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
112                                                     final String typeName, final String moduleShardsConfig,
113                                                     final boolean waitUntilLeader, final String... shardNames)
114             throws Exception {
115         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, waitUntilLeader,
116                 SchemaContextHelper.full(), shardNames);
117     }
118
119     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
120                                                     final String typeName, final String moduleShardsConfig,
121                                                     final boolean waitUntilLeader,
122                                                     final EffectiveModelContext schemaContext,
123                                                     final String... shardNames) throws Exception {
124         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
125                 schemaContext, shardNames);
126     }
127
128     private AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
129                                                      final String typeName, final String moduleShardsConfig,
130                                                      final String modulesConfig, final boolean waitUntilLeader,
131                                                      final EffectiveModelContext schemaContext,
132                                                      final String... shardNames)
133             throws Exception {
134         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
135         final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
136
137         setDataStoreName(typeName);
138
139         // Make sure we set up datastore context correctly
140         datastoreContextBuilder.useTellBasedProtocol(ClientBackedDataStore.class.isAssignableFrom(implementation));
141
142         final DatastoreContext datastoreContext = datastoreContextBuilder.build();
143         final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
144         doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
145         doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(anyString());
146
147         final Constructor<? extends AbstractDataStore> constructor = implementation.getDeclaredConstructor(
148                 ActorSystem.class, ClusterWrapper.class, Configuration.class,
149                 DatastoreContextFactory.class, DatastoreSnapshot.class);
150
151         final AbstractDataStore dataStore = constructor.newInstance(getSystem(), cluster, config, mockContextFactory,
152             restoreFromSnapshot);
153
154         dataStore.onModelContextUpdated(schemaContext);
155
156         if (waitUntilLeader) {
157             waitUntilLeader(dataStore.getActorUtils(), shardNames);
158         }
159
160         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
161         return dataStore;
162     }
163
164     private void setDataStoreName(final String typeName) {
165         if ("config".equals(typeName)) {
166             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.CONFIGURATION);
167         } else if ("operational".equals(typeName)) {
168             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.OPERATIONAL);
169         } else {
170             datastoreContextBuilder.dataStoreName(typeName);
171         }
172     }
173
174     @Deprecated(since = "7.0.0", forRemoval = true)
175     public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
176                                                                        final EffectiveModelContext schemaContext) {
177         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
178         final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
179
180         setDataStoreName(typeName);
181
182         final DatastoreContext datastoreContext = getDatastoreContextBuilder().build();
183
184         final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
185         doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
186         doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(anyString());
187
188         final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
189                 configuration, mockContextFactory, restoreFromSnapshot);
190
191         dataStore.onModelContextUpdated(schemaContext);
192
193         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
194         return dataStore;
195     }
196
197     @Deprecated(since = "7.0.0", forRemoval = true)
198     public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
199                                                                        final EffectiveModelContext schemaContext,
200                                                                        final LogicalDatastoreType storeType) {
201         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
202         final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
203
204         setDataStoreName(typeName);
205
206         final DatastoreContext datastoreContext =
207                 getDatastoreContextBuilder().logicalStoreType(storeType).build();
208
209         final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
210         doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
211         doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(anyString());
212
213         final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
214                 configuration, mockContextFactory, restoreFromSnapshot);
215
216         dataStore.onModelContextUpdated(schemaContext);
217
218         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
219         return dataStore;
220     }
221
222     public void waitUntilLeader(final ActorUtils actorUtils, final String... shardNames) {
223         for (String shardName: shardNames) {
224             ActorRef shard = findLocalShard(actorUtils, shardName);
225
226             assertNotNull("Shard was not created for " + shardName, shard);
227
228             waitUntilLeader(shard);
229         }
230     }
231
232     public void waitUntilNoLeader(final ActorUtils actorUtils, final String... shardNames) {
233         for (String shardName: shardNames) {
234             ActorRef shard = findLocalShard(actorUtils, shardName);
235             assertNotNull("No local shard found for " + shardName, shard);
236
237             waitUntilNoLeader(shard);
238         }
239     }
240
241     public void waitForMembersUp(final String... otherMembers) {
242         Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
243         Stopwatch sw = Stopwatch.createStarted();
244         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
245             CurrentClusterState state = Cluster.get(getSystem()).state();
246             for (Member m: state.getMembers()) {
247                 if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next())
248                         && otherMembersSet.isEmpty()) {
249                     return;
250                 }
251             }
252
253             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
254         }
255
256         fail("Member(s) " + otherMembersSet + " are not Up");
257     }
258
259     public static ActorRef findLocalShard(final ActorUtils actorUtils, final String shardName) {
260         ActorRef shard = null;
261         for (int i = 0; i < 20 * 5 && shard == null; i++) {
262             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
263             Optional<ActorRef> shardReply = actorUtils.findLocalShard(shardName);
264             if (shardReply.isPresent()) {
265                 shard = shardReply.get();
266             }
267         }
268         return shard;
269     }
270
271     public static void waitUntilShardIsDown(final ActorUtils actorUtils, final String shardName) {
272         for (int i = 0; i < 20 * 5 ; i++) {
273             LOG.debug("Waiting for shard down {}", shardName);
274             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
275             Optional<ActorRef> shardReply = actorUtils.findLocalShard(shardName);
276             if (!shardReply.isPresent()) {
277                 return;
278             }
279         }
280
281         throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time");
282     }
283
284     public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
285             final ShardStatsVerifier verifier) throws Exception {
286         ActorUtils actorUtils = datastore.getActorUtils();
287
288         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
289         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
290
291         AssertionError lastError = null;
292         Stopwatch sw = Stopwatch.createStarted();
293         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
294             ShardStats shardStats = (ShardStats)actorUtils
295                     .executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
296
297             try {
298                 verifier.verify(shardStats);
299                 return;
300             } catch (AssertionError e) {
301                 lastError = e;
302                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
303             }
304         }
305
306         throw lastError;
307     }
308
309     public static void verifyShardState(final AbstractDataStore datastore, final String shardName,
310             final Consumer<OnDemandShardState> verifier) throws Exception {
311         ActorUtils actorUtils = datastore.getActorUtils();
312
313         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
314         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
315
316         AssertionError lastError = null;
317         Stopwatch sw = Stopwatch.createStarted();
318         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
319             OnDemandShardState shardState = (OnDemandShardState)actorUtils
320                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
321
322             try {
323                 verifier.accept(shardState);
324                 return;
325             } catch (AssertionError e) {
326                 lastError = e;
327                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
328             }
329         }
330
331         throw lastError;
332     }
333
334     void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath,
335             final NormalizedNode nodeToWrite) throws Exception {
336
337         // 1. Create a write-only Tx
338
339         DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
340         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
341
342         // 2. Write some data
343
344         writeTx.write(nodePath, nodeToWrite);
345
346         // 3. Ready the Tx for commit
347
348         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
349
350         // 4. Commit the Tx
351
352         doCommit(cohort);
353
354         // 5. Verify the data in the store
355
356         DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
357         assertEquals(Optional.of(nodeToWrite), readTx.read(nodePath).get(5, TimeUnit.SECONDS));
358     }
359
360     public void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
361         Boolean canCommit = cohort.canCommit().get(commitTimeout, TimeUnit.SECONDS);
362         assertEquals("canCommit", Boolean.TRUE, canCommit);
363         cohort.preCommit().get(5, TimeUnit.SECONDS);
364         cohort.commit().get(5, TimeUnit.SECONDS);
365     }
366
367     void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort)
368             throws Exception {
369         Boolean canCommit = canCommitFuture.get(commitTimeout, TimeUnit.SECONDS);
370         assertEquals("canCommit", Boolean.TRUE, canCommit);
371         cohort.preCommit().get(5, TimeUnit.SECONDS);
372         cohort.commit().get(5, TimeUnit.SECONDS);
373     }
374
375     void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
376             final Class<? extends Exception> expType) {
377         assertThrows(expType, () -> txChain.newWriteOnlyTransaction());
378         assertThrows(expType, () -> txChain.newReadWriteTransaction());
379         assertThrows(expType, () -> txChain.newReadOnlyTransaction());
380     }
381
382     public interface ShardStatsVerifier {
383         void verify(ShardStats stats);
384     }
385 }