e381152db221db1f11e4a5ba5eff61f7ec415cc2
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / IntegrationTestKit.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertThrows;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.anyString;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent.CurrentClusterState;
22 import akka.cluster.Member;
23 import akka.cluster.MemberStatus;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import java.lang.reflect.Constructor;
29 import java.util.Optional;
30 import java.util.Set;
31 import java.util.concurrent.TimeUnit;
32 import java.util.function.Consumer;
33 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
35 import org.opendaylight.controller.cluster.datastore.config.Configuration;
36 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
37 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
38 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
40 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
41 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
42 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
43 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
44 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
46 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
49 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import scala.concurrent.Await;
53 import scala.concurrent.Future;
54 import scala.concurrent.duration.FiniteDuration;
55
56 public class IntegrationTestKit extends ShardTestKit {
57
58     private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class);
59
60     protected DatastoreContext.Builder datastoreContextBuilder;
61     protected DatastoreSnapshot restoreFromSnapshot;
62     private final int commitTimeout;
63
64     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder) {
65         this(actorSystem, datastoreContextBuilder, 7);
66     }
67
68     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder,
69             final int commitTimeout) {
70         super(actorSystem);
71         this.datastoreContextBuilder = datastoreContextBuilder;
72         this.commitTimeout = commitTimeout;
73     }
74
75     public DatastoreContext.Builder getDatastoreContextBuilder() {
76         return datastoreContextBuilder;
77     }
78
79     @Deprecated(since = "7.0.0", forRemoval = true)
80     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
81                                                           final boolean waitUntilLeader,
82                                                           final EffectiveModelContext schemaContext) throws Exception {
83         return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader, schemaContext);
84     }
85
86     @Deprecated(since = "7.0.0", forRemoval = true)
87     public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
88                                                           final String modulesConfig,
89                                                           final boolean waitUntilLeader,
90                                                           final EffectiveModelContext schemaContext,
91                                                           final String... shardNames) throws Exception {
92         return (DistributedDataStore) setupAbstractDataStore(DistributedDataStore.class, typeName, moduleShardsConfig,
93                 modulesConfig, waitUntilLeader, schemaContext, shardNames);
94     }
95
96     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
97                                                     final String typeName, final String... shardNames)
98             throws Exception {
99         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", true,
100                 SchemaContextHelper.full(), shardNames);
101     }
102
103     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
104                                                     final String typeName, final boolean waitUntilLeader,
105                                                     final String... shardNames) throws Exception {
106         return setupAbstractDataStore(implementation, typeName, "module-shards.conf", waitUntilLeader,
107                 SchemaContextHelper.full(), shardNames);
108     }
109
110     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
111                                                     final String typeName, final String moduleShardsConfig,
112                                                     final boolean waitUntilLeader, final String... shardNames)
113             throws Exception {
114         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, waitUntilLeader,
115                 SchemaContextHelper.full(), shardNames);
116     }
117
118     public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
119                                                     final String typeName, final String moduleShardsConfig,
120                                                     final boolean waitUntilLeader,
121                                                     final EffectiveModelContext schemaContext,
122                                                     final String... shardNames) throws Exception {
123         return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
124                 schemaContext, shardNames);
125     }
126
127     private AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
128                                                      final String typeName, final String moduleShardsConfig,
129                                                      final String modulesConfig, final boolean waitUntilLeader,
130                                                      final EffectiveModelContext schemaContext,
131                                                      final String... shardNames)
132             throws Exception {
133         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
134         final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
135
136         setDataStoreName(typeName);
137
138         // Make sure we set up datastore context correctly
139         datastoreContextBuilder.useTellBasedProtocol(ClientBackedDataStore.class.isAssignableFrom(implementation));
140
141         final DatastoreContext datastoreContext = datastoreContextBuilder.build();
142         final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
143         doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
144         doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(anyString());
145
146         final Constructor<? extends AbstractDataStore> constructor = implementation.getDeclaredConstructor(
147                 ActorSystem.class, ClusterWrapper.class, Configuration.class,
148                 DatastoreContextFactory.class, DatastoreSnapshot.class);
149
150         final AbstractDataStore dataStore = constructor.newInstance(getSystem(), cluster, config, mockContextFactory,
151             restoreFromSnapshot);
152
153         dataStore.onModelContextUpdated(schemaContext);
154
155         if (waitUntilLeader) {
156             waitUntilLeader(dataStore.getActorUtils(), shardNames);
157         }
158
159         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
160         return dataStore;
161     }
162
163     private void setDataStoreName(final String typeName) {
164         if ("config".equals(typeName)) {
165             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.CONFIGURATION);
166         } else if ("operational".equals(typeName)) {
167             datastoreContextBuilder.logicalStoreType(LogicalDatastoreType.OPERATIONAL);
168         } else {
169             datastoreContextBuilder.dataStoreName(typeName);
170         }
171     }
172
173     public void waitUntilLeader(final ActorUtils actorUtils, final String... shardNames) {
174         for (String shardName: shardNames) {
175             ActorRef shard = findLocalShard(actorUtils, shardName);
176
177             assertNotNull("Shard was not created for " + shardName, shard);
178
179             waitUntilLeader(shard);
180         }
181     }
182
183     public void waitUntilNoLeader(final ActorUtils actorUtils, final String... shardNames) {
184         for (String shardName: shardNames) {
185             ActorRef shard = findLocalShard(actorUtils, shardName);
186             assertNotNull("No local shard found for " + shardName, shard);
187
188             waitUntilNoLeader(shard);
189         }
190     }
191
192     public void waitForMembersUp(final String... otherMembers) {
193         Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
194         Stopwatch sw = Stopwatch.createStarted();
195         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
196             CurrentClusterState state = Cluster.get(getSystem()).state();
197             for (Member m: state.getMembers()) {
198                 if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next())
199                         && otherMembersSet.isEmpty()) {
200                     return;
201                 }
202             }
203
204             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
205         }
206
207         fail("Member(s) " + otherMembersSet + " are not Up");
208     }
209
210     public static ActorRef findLocalShard(final ActorUtils actorUtils, final String shardName) {
211         for (int i = 0; i < 20 * 5; i++) {
212             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
213             Optional<ActorRef> shardReply = actorUtils.findLocalShard(shardName);
214             if (shardReply.isPresent()) {
215                 return shardReply.orElseThrow();
216             }
217         }
218         return null;
219     }
220
221     public static void waitUntilShardIsDown(final ActorUtils actorUtils, final String shardName) {
222         for (int i = 0; i < 20 * 5 ; i++) {
223             LOG.debug("Waiting for shard down {}", shardName);
224             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
225             Optional<ActorRef> shardReply = actorUtils.findLocalShard(shardName);
226             if (!shardReply.isPresent()) {
227                 return;
228             }
229         }
230
231         throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time");
232     }
233
234     public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
235             final ShardStatsVerifier verifier) throws Exception {
236         ActorUtils actorUtils = datastore.getActorUtils();
237
238         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
239         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
240
241         AssertionError lastError = null;
242         Stopwatch sw = Stopwatch.createStarted();
243         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
244             ShardStats shardStats = (ShardStats)actorUtils
245                     .executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
246
247             try {
248                 verifier.verify(shardStats);
249                 return;
250             } catch (AssertionError e) {
251                 lastError = e;
252                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
253             }
254         }
255
256         throw lastError;
257     }
258
259     public static void verifyShardState(final AbstractDataStore datastore, final String shardName,
260             final Consumer<OnDemandShardState> verifier) throws Exception {
261         ActorUtils actorUtils = datastore.getActorUtils();
262
263         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
264         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
265
266         AssertionError lastError = null;
267         Stopwatch sw = Stopwatch.createStarted();
268         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
269             OnDemandShardState shardState = (OnDemandShardState)actorUtils
270                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
271
272             try {
273                 verifier.accept(shardState);
274                 return;
275             } catch (AssertionError e) {
276                 lastError = e;
277                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
278             }
279         }
280
281         throw lastError;
282     }
283
284     void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath,
285             final NormalizedNode nodeToWrite) throws Exception {
286
287         // 1. Create a write-only Tx
288
289         DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
290         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
291
292         // 2. Write some data
293
294         writeTx.write(nodePath, nodeToWrite);
295
296         // 3. Ready the Tx for commit
297
298         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
299
300         // 4. Commit the Tx
301
302         doCommit(cohort);
303
304         // 5. Verify the data in the store
305
306         DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
307         assertEquals(Optional.of(nodeToWrite), readTx.read(nodePath).get(5, TimeUnit.SECONDS));
308     }
309
310     public void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
311         Boolean canCommit = cohort.canCommit().get(commitTimeout, TimeUnit.SECONDS);
312         assertEquals("canCommit", Boolean.TRUE, canCommit);
313         cohort.preCommit().get(5, TimeUnit.SECONDS);
314         cohort.commit().get(5, TimeUnit.SECONDS);
315     }
316
317     void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort)
318             throws Exception {
319         Boolean canCommit = canCommitFuture.get(commitTimeout, TimeUnit.SECONDS);
320         assertEquals("canCommit", Boolean.TRUE, canCommit);
321         cohort.preCommit().get(5, TimeUnit.SECONDS);
322         cohort.commit().get(5, TimeUnit.SECONDS);
323     }
324
325     void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
326             final Class<? extends Exception> expType) {
327         assertThrows(expType, () -> txChain.newWriteOnlyTransaction());
328         assertThrows(expType, () -> txChain.newReadWriteTransaction());
329         assertThrows(expType, () -> txChain.newReadOnlyTransaction());
330     }
331
332     public interface ShardStatsVerifier {
333         void verify(ShardStats stats);
334     }
335 }