Bump upstreams
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / IntegrationTestKit.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertThrows;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.anyString;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent.CurrentClusterState;
22 import akka.cluster.Member;
23 import akka.cluster.MemberStatus;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import java.lang.reflect.Constructor;
29 import java.util.Optional;
30 import java.util.Set;
31 import java.util.concurrent.TimeUnit;
32 import java.util.function.Consumer;
33 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
35 import org.opendaylight.controller.cluster.datastore.config.Configuration;
36 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
37 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
38 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
39 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
41 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
42 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
43 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
44 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
46 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
47 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.Await;
54 import scala.concurrent.Future;
55 import scala.concurrent.duration.FiniteDuration;
56
57 public class IntegrationTestKit extends ShardTestKit {
58
59     private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class);
60
61     protected DatastoreContext.Builder datastoreContextBuilder;
62     protected DatastoreSnapshot restoreFromSnapshot;
63     private final int commitTimeout;
64
65     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder) {
66         this(actorSystem, datastoreContextBuilder, 7);
67     }
68
69     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder,
70             final int commitTimeout) {
71         super(actorSystem);
72         this.datastoreContextBuilder = datastoreContextBuilder;
73         this.commitTimeout = commitTimeout;
74     }
75
76     public DatastoreContext.Builder getDatastoreContextBuilder() {
77         return datastoreContextBuilder;
78     }
79
80     @Deprecated(since = "7.0.0", forRemoval = true)
81     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
82                                                           final boolean waitUntilLeader,
83                                                           final EffectiveModelContext schemaContext) throws Exception {
84         return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader, schemaContext);
85     }
86
87     @Deprecated(since = "7.0.0", forRemoval = true)
88     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
89                                                           final String modulesConfig,
90                                                           final boolean waitUntilLeader,
91                                                           final EffectiveModelContext schemaContext,
92                                                           final String... shardNames) throws Exception {
93         return (DistributedDataStore) setupAbstractDataStore(DistributedDataStore.class, typeName, moduleShardsConfig,
94                 modulesConfig, waitUntilLeader, schemaContext, shardNames);
95     }
96
97     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
98                                                     final String typeName, final String... shardNames)
99             throws Exception {
100         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", true,
101                 SchemaContextHelper.full(), shardNames);
102     }
103
104     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
105                                                     final String typeName, final boolean waitUntilLeader,
106                                                     final String... shardNames) throws Exception {
107         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", waitUntilLeader,
108                 SchemaContextHelper.full(), shardNames);
109     }
110
111     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
112                                                     final String typeName, final String moduleShardsConfig,
113                                                     final boolean waitUntilLeader, final String... shardNames)
114             throws Exception {
115         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, waitUntilLeader,
116                 SchemaContextHelper.full(), shardNames);
117     }
118
119     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
120                                                     final String typeName, final String moduleShardsConfig,
121                                                     final boolean waitUntilLeader,
122                                                     final EffectiveModelContext schemaContext,
123                                                     final String... shardNames) throws Exception {
124         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
125                 schemaContext, shardNames);
126     }
127
128     private AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
129                                                      final String typeName, final String moduleShardsConfig,
130                                                      final String modulesConfig, final boolean waitUntilLeader,
131                                                      final EffectiveModelContext schemaContext,
132                                                      final String... shardNames)
133             throws Exception {
134         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
135         final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
136
137         setDataStoreName(typeName);
138
139         // Make sure we set up datastore context correctly
140         datastoreContextBuilder.useTellBasedProtocol(ClientBackedDataStore.class.isAssignableFrom(implementation));
141
142         final DatastoreContext datastoreContext = datastoreContextBuilder.build();
143         final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
144         doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
145         doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(anyString());
146
147         final Constructor<? extends AbstractDataStore> constructor = implementation.getDeclaredConstructor(
148                 ActorSystem.class, ClusterWrapper.class, Configuration.class,
149                 DatastoreContextFactory.class, DatastoreSnapshot.class);
150
151         final AbstractDataStore dataStore = constructor.newInstance(getSystem(), cluster, config, mockContextFactory,
152             restoreFromSnapshot);
153
154         dataStore.onModelContextUpdated(schemaContext);
155
156         if (waitUntilLeader) {
157             waitUntilLeader(dataStore.getActorUtils(), shardNames);
158         }
159
160         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
161         return dataStore;
162     }
163
164     private void setDataStoreName(final String typeName) {
165         if ("config".equals(typeName)) {
166             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.CONFIGURATION);
167         } else if ("operational".equals(typeName)) {
168             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.OPERATIONAL);
169         } else {
170             datastoreContextBuilder.dataStoreName(typeName);
171         }
172     }
173
174     @Deprecated(since = "7.0.0", forRemoval = true)
175     public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
176                                                                        final EffectiveModelContext schemaContext) {
177         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
178         final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
179
180         setDataStoreName(typeName);
181
182         final DatastoreContext datastoreContext = getDatastoreContextBuilder().build();
183
184         final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
185         doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
186         doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(anyString());
187
188         final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
189                 configuration, mockContextFactory, restoreFromSnapshot);
190
191         dataStore.onModelContextUpdated(schemaContext);
192
193         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
194         return dataStore;
195     }
196
197     @Deprecated(since = "7.0.0", forRemoval = true)
198     public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
199                                                                        final EffectiveModelContext schemaContext,
200                                                                        final LogicalDatastoreType storeType) {
201         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
202         final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
203
204         setDataStoreName(typeName);
205
206         final DatastoreContext datastoreContext =
207                 getDatastoreContextBuilder().logicalStoreType(storeType).build();
208
209         final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
210         doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
211         doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(anyString());
212
213         final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
214                 configuration, mockContextFactory, restoreFromSnapshot);
215
216         dataStore.onModelContextUpdated(schemaContext);
217
218         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
219         return dataStore;
220     }
221
222     public void waitUntilLeader(final ActorUtils actorUtils, final String... shardNames) {
223         for (String shardName: shardNames) {
224             ActorRef shard = findLocalShard(actorUtils, shardName);
225
226             assertNotNull("Shard was not created for " + shardName, shard);
227
228             waitUntilLeader(shard);
229         }
230     }
231
232     public void waitUntilNoLeader(final ActorUtils actorUtils, final String... shardNames) {
233         for (String shardName: shardNames) {
234             ActorRef shard = findLocalShard(actorUtils, shardName);
235             assertNotNull("No local shard found for " + shardName, shard);
236
237             waitUntilNoLeader(shard);
238         }
239     }
240
241     public void waitForMembersUp(final String... otherMembers) {
242         Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
243         Stopwatch sw = Stopwatch.createStarted();
244         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
245             CurrentClusterState state = Cluster.get(getSystem()).state();
246             for (Member m: state.getMembers()) {
247                 if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next())
248                         && otherMembersSet.isEmpty()) {
249                     return;
250                 }
251             }
252
253             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
254         }
255
256         fail("Member(s) " + otherMembersSet + " are not Up");
257     }
258
259     public static ActorRef findLocalShard(final ActorUtils actorUtils, final String shardName) {
260         for (int i = 0; i < 20 * 5; i++) {
261             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
262             Optional<ActorRef> shardReply = actorUtils.findLocalShard(shardName);
263             if (shardReply.isPresent()) {
264                 return shardReply.orElseThrow();
265             }
266         }
267         return null;
268     }
269
270     public static void waitUntilShardIsDown(final ActorUtils actorUtils, final String shardName) {
271         for (int i = 0; i < 20 * 5 ; i++) {
272             LOG.debug("Waiting for shard down {}", shardName);
273             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
274             Optional<ActorRef> shardReply = actorUtils.findLocalShard(shardName);
275             if (!shardReply.isPresent()) {
276                 return;
277             }
278         }
279
280         throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time");
281     }
282
283     public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
284             final ShardStatsVerifier verifier) throws Exception {
285         ActorUtils actorUtils = datastore.getActorUtils();
286
287         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
288         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
289
290         AssertionError lastError = null;
291         Stopwatch sw = Stopwatch.createStarted();
292         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
293             ShardStats shardStats = (ShardStats)actorUtils
294                     .executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
295
296             try {
297                 verifier.verify(shardStats);
298                 return;
299             } catch (AssertionError e) {
300                 lastError = e;
301                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
302             }
303         }
304
305         throw lastError;
306     }
307
308     public static void verifyShardState(final AbstractDataStore datastore, final String shardName,
309             final Consumer<OnDemandShardState> verifier) throws Exception {
310         ActorUtils actorUtils = datastore.getActorUtils();
311
312         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
313         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
314
315         AssertionError lastError = null;
316         Stopwatch sw = Stopwatch.createStarted();
317         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
318             OnDemandShardState shardState = (OnDemandShardState)actorUtils
319                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
320
321             try {
322                 verifier.accept(shardState);
323                 return;
324             } catch (AssertionError e) {
325                 lastError = e;
326                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
327             }
328         }
329
330         throw lastError;
331     }
332
333     void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath,
334             final NormalizedNode nodeToWrite) throws Exception {
335
336         // 1. Create a write-only Tx
337
338         DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
339         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
340
341         // 2. Write some data
342
343         writeTx.write(nodePath, nodeToWrite);
344
345         // 3. Ready the Tx for commit
346
347         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
348
349         // 4. Commit the Tx
350
351         doCommit(cohort);
352
353         // 5. Verify the data in the store
354
355         DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
356         assertEquals(Optional.of(nodeToWrite), readTx.read(nodePath).get(5, TimeUnit.SECONDS));
357     }
358
359     public void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
360         Boolean canCommit = cohort.canCommit().get(commitTimeout, TimeUnit.SECONDS);
361         assertEquals("canCommit", Boolean.TRUE, canCommit);
362         cohort.preCommit().get(5, TimeUnit.SECONDS);
363         cohort.commit().get(5, TimeUnit.SECONDS);
364     }
365
366     void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort)
367             throws Exception {
368         Boolean canCommit = canCommitFuture.get(commitTimeout, TimeUnit.SECONDS);
369         assertEquals("canCommit", Boolean.TRUE, canCommit);
370         cohort.preCommit().get(5, TimeUnit.SECONDS);
371         cohort.commit().get(5, TimeUnit.SECONDS);
372     }
373
374     void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
375             final Class<? extends Exception> expType) {
376         assertThrows(expType, () -> txChain.newWriteOnlyTransaction());
377         assertThrows(expType, () -> txChain.newReadWriteTransaction());
378         assertThrows(expType, () -> txChain.newReadOnlyTransaction());
379     }
380
381     public interface ShardStatsVerifier {
382         void verify(ShardStats stats);
383     }
384 }