Rework CDS commit cohort impl to handle yang lists
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / IntegrationTestKit.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.fail;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent.CurrentClusterState;
18 import akka.cluster.Member;
19 import akka.cluster.MemberStatus;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.Set;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Consumer;
29 import org.mockito.Mockito;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
31 import org.opendaylight.controller.cluster.datastore.config.Configuration;
32 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
33 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
34 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
35 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
36 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
39 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
43 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Await;
50 import scala.concurrent.Future;
51 import scala.concurrent.duration.Duration;
52
53 public class IntegrationTestKit extends ShardTestKit {
54
55     private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class);
56
57     protected DatastoreContext.Builder datastoreContextBuilder;
58     protected DatastoreSnapshot restoreFromSnapshot;
59
60     public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder) {
61         super(actorSystem);
62         this.datastoreContextBuilder = datastoreContextBuilder;
63     }
64
65     public DatastoreContext.Builder getDatastoreContextBuilder() {
66         return datastoreContextBuilder;
67     }
68
69     public AbstractDataStore setupDistributedDataStore(final String typeName, final String... shardNames) {
70         return setupDistributedDataStore(typeName, "module-shards.conf", true, SchemaContextHelper.full(), shardNames);
71     }
72
73     public AbstractDataStore setupDistributedDataStore(final String typeName, final boolean waitUntilLeader,
74             final String... shardNames) {
75         return setupDistributedDataStore(typeName, "module-shards.conf", waitUntilLeader,
76                 SchemaContextHelper.full(), shardNames);
77     }
78
79     public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
80             final boolean waitUntilLeader, final String... shardNames) {
81         return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader,
82                 SchemaContextHelper.full(), shardNames);
83     }
84
85     public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
86             final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
87         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
88         final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
89
90         datastoreContextBuilder.dataStoreName(typeName);
91
92         DatastoreContext datastoreContext = datastoreContextBuilder.build();
93         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
94         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
95         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
96
97         AbstractDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
98                 restoreFromSnapshot);
99
100         dataStore.onGlobalContextUpdated(schemaContext);
101
102         if (waitUntilLeader) {
103             waitUntilLeader(dataStore.getActorContext(), shardNames);
104         }
105
106         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
107         return dataStore;
108     }
109
110     public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
111                                                                        final SchemaContext schemaContext) {
112         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
113         final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
114
115         getDatastoreContextBuilder().dataStoreName(typeName);
116
117         final DatastoreContext datastoreContext = getDatastoreContextBuilder().build();
118
119         final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
120         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
121         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
122
123         final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
124                 configuration, mockContextFactory, restoreFromSnapshot);
125
126         dataStore.onGlobalContextUpdated(schemaContext);
127
128         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
129         return dataStore;
130     }
131
132     public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) {
133         for (String shardName: shardNames) {
134             ActorRef shard = findLocalShard(actorContext, shardName);
135
136             assertNotNull("Shard was not created for " + shardName, shard);
137
138             waitUntilLeader(shard);
139         }
140     }
141
142     public void waitUntilNoLeader(final ActorContext actorContext, final String... shardNames) {
143         for (String shardName: shardNames) {
144             ActorRef shard = findLocalShard(actorContext, shardName);
145             assertNotNull("No local shard found for " + shardName, shard);
146
147             waitUntilNoLeader(shard);
148         }
149     }
150
151     public void waitForMembersUp(final String... otherMembers) {
152         Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
153         Stopwatch sw = Stopwatch.createStarted();
154         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
155             CurrentClusterState state = Cluster.get(getSystem()).state();
156             for (Member m: state.getMembers()) {
157                 if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next())
158                         && otherMembersSet.isEmpty()) {
159                     return;
160                 }
161             }
162
163             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
164         }
165
166         fail("Member(s) " + otherMembersSet + " are not Up");
167     }
168
169     public static ActorRef findLocalShard(final ActorContext actorContext, final String shardName) {
170         ActorRef shard = null;
171         for (int i = 0; i < 20 * 5 && shard == null; i++) {
172             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
173             Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
174             if (shardReply.isPresent()) {
175                 shard = shardReply.get();
176             }
177         }
178         return shard;
179     }
180
181     public static void waitUntilShardIsDown(ActorContext actorContext, String shardName) {
182         for (int i = 0; i < 20 * 5 ; i++) {
183             LOG.debug("Waiting for shard down {}", shardName);
184             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
185             Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
186             if (!shardReply.isPresent()) {
187                 return;
188             }
189         }
190
191         throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time");
192     }
193
194     public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
195             final ShardStatsVerifier verifier) throws Exception {
196         ActorContext actorContext = datastore.getActorContext();
197
198         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
199         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
200
201         AssertionError lastError = null;
202         Stopwatch sw = Stopwatch.createStarted();
203         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
204             ShardStats shardStats = (ShardStats)actorContext
205                     .executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
206
207             try {
208                 verifier.verify(shardStats);
209                 return;
210             } catch (AssertionError e) {
211                 lastError = e;
212                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
213             }
214         }
215
216         throw lastError;
217     }
218
219     public static void verifyShardState(final AbstractDataStore datastore, final String shardName,
220             final Consumer<OnDemandShardState> verifier) throws Exception {
221         ActorContext actorContext = datastore.getActorContext();
222
223         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
224         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
225
226         AssertionError lastError = null;
227         Stopwatch sw = Stopwatch.createStarted();
228         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
229             OnDemandShardState shardState = (OnDemandShardState)actorContext
230                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
231
232             try {
233                 verifier.accept(shardState);
234                 return;
235             } catch (AssertionError e) {
236                 lastError = e;
237                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
238             }
239         }
240
241         throw lastError;
242     }
243
244     void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath,
245             final NormalizedNode<?, ?> nodeToWrite) throws Exception {
246
247         // 1. Create a write-only Tx
248
249         DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
250         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
251
252         // 2. Write some data
253
254         writeTx.write(nodePath, nodeToWrite);
255
256         // 3. Ready the Tx for commit
257
258         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
259
260         // 4. Commit the Tx
261
262         doCommit(cohort);
263
264         // 5. Verify the data in the store
265
266         DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
267
268         Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
269         assertEquals("isPresent", true, optional.isPresent());
270         assertEquals("Data node", nodeToWrite, optional.get());
271     }
272
273     public void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
274         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
275         assertEquals("canCommit", true, canCommit);
276         cohort.preCommit().get(5, TimeUnit.SECONDS);
277         cohort.commit().get(5, TimeUnit.SECONDS);
278     }
279
280     void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort)
281             throws Exception {
282         Boolean canCommit = canCommitFuture.get(7, TimeUnit.SECONDS);
283         assertEquals("canCommit", true, canCommit);
284         cohort.preCommit().get(5, TimeUnit.SECONDS);
285         cohort.commit().get(5, TimeUnit.SECONDS);
286     }
287
288     @SuppressWarnings("checkstyle:IllegalCatch")
289     void assertExceptionOnCall(final Callable<Void> callable, final Class<? extends Exception> expType)
290             throws Exception {
291         try {
292             callable.call();
293             fail("Expected " + expType.getSimpleName());
294         } catch (Exception e) {
295             assertEquals("Exception type", expType, e.getClass());
296         }
297     }
298
299     void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
300             final Class<? extends Exception> expType) throws Exception {
301         assertExceptionOnCall(() -> {
302             txChain.newWriteOnlyTransaction();
303             return null;
304         }, expType);
305
306         assertExceptionOnCall(() -> {
307             txChain.newReadWriteTransaction();
308             return null;
309         }, expType);
310
311         assertExceptionOnCall(() -> {
312             txChain.newReadOnlyTransaction();
313             return null;
314         }, expType);
315     }
316
317     public interface ShardStatsVerifier {
318         void verify(ShardStats stats);
319     }
320 }