c3d216be9ca9d144c51741194c06bb0550919549
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / IntegrationTestKit.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.fail;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Stopwatch;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.TimeUnit;
22 import org.mockito.Mockito;
23 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
24 import org.opendaylight.controller.cluster.datastore.config.Configuration;
25 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
26 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
27 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
28 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
29 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import scala.concurrent.Await;
38 import scala.concurrent.Future;
39 import scala.concurrent.duration.Duration;
40
41 public class IntegrationTestKit extends ShardTestKit {
42
43     protected DatastoreContext.Builder datastoreContextBuilder;
44     protected DatastoreSnapshot restoreFromSnapshot;
45
46     public IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) {
47         super(actorSystem);
48         this.datastoreContextBuilder = datastoreContextBuilder;
49     }
50
51     public DatastoreContext.Builder getDatastoreContextBuilder() {
52         return datastoreContextBuilder;
53     }
54
55     public DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
56         return setupDistributedDataStore(typeName, "module-shards.conf", true, SchemaContextHelper.full(), shardNames);
57     }
58
59     public DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
60             String... shardNames) {
61         return setupDistributedDataStore(typeName, "module-shards.conf", waitUntilLeader,
62                 SchemaContextHelper.full(), shardNames);
63     }
64
65     public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig,
66             boolean waitUntilLeader, String... shardNames) {
67         return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader,
68                 SchemaContextHelper.full(), shardNames);
69     }
70
71     public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig,
72             boolean waitUntilLeader, SchemaContext schemaContext, String... shardNames) {
73         ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
74         Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
75
76         datastoreContextBuilder.dataStoreName(typeName);
77
78         DatastoreContext datastoreContext = datastoreContextBuilder.build();
79         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
80         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
81         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
82
83         DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
84                 restoreFromSnapshot);
85
86         dataStore.onGlobalContextUpdated(schemaContext);
87
88         if(waitUntilLeader) {
89             waitUntilLeader(dataStore.getActorContext(), shardNames);
90         }
91
92         datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
93         return dataStore;
94     }
95
96     public void waitUntilLeader(ActorContext actorContext, String... shardNames) {
97         for(String shardName: shardNames) {
98             ActorRef shard = findLocalShard(actorContext, shardName);
99
100             assertNotNull("Shard was not created for " + shardName, shard);
101
102             waitUntilLeader(shard);
103         }
104     }
105
106     public void waitUntilNoLeader(ActorContext actorContext, String... shardNames) {
107         for(String shardName: shardNames) {
108             ActorRef shard = findLocalShard(actorContext, shardName);
109             assertNotNull("No local shard found for " + shardName, shard);
110
111             waitUntilNoLeader(shard);
112         }
113     }
114
115     public static ActorRef findLocalShard(ActorContext actorContext, String shardName) {
116         ActorRef shard = null;
117         for(int i = 0; i < 20 * 5 && shard == null; i++) {
118             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
119             Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
120             if(shardReply.isPresent()) {
121                 shard = shardReply.get();
122             }
123         }
124         return shard;
125     }
126
127     public static void verifyShardStats(DistributedDataStore datastore, String shardName, ShardStatsVerifier verifier)
128             throws Exception {
129         ActorContext actorContext = datastore.getActorContext();
130
131         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
132         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
133
134         AssertionError lastError = null;
135         Stopwatch sw = Stopwatch.createStarted();
136         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
137             ShardStats shardStats = (ShardStats)actorContext.
138                     executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
139
140             try {
141                 verifier.verify(shardStats);
142                 return;
143             } catch (AssertionError e) {
144                 lastError = e;
145                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
146             }
147         }
148
149         throw lastError;
150     }
151
152     void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
153             NormalizedNode<?, ?> nodeToWrite) throws Exception {
154
155         // 1. Create a write-only Tx
156
157         DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
158         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
159
160         // 2. Write some data
161
162         writeTx.write(nodePath, nodeToWrite);
163
164         // 3. Ready the Tx for commit
165
166         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
167
168         // 4. Commit the Tx
169
170         doCommit(cohort);
171
172         // 5. Verify the data in the store
173
174         DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
175
176         Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
177         assertEquals("isPresent", true, optional.isPresent());
178         assertEquals("Data node", nodeToWrite, optional.get());
179     }
180
181     public void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
182         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
183         assertEquals("canCommit", true, canCommit);
184         cohort.preCommit().get(5, TimeUnit.SECONDS);
185         cohort.commit().get(5, TimeUnit.SECONDS);
186     }
187
188     void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
189         Boolean canCommit = canCommitFuture.get(7, TimeUnit.SECONDS);
190         assertEquals("canCommit", true, canCommit);
191         cohort.preCommit().get(5, TimeUnit.SECONDS);
192         cohort.commit().get(5, TimeUnit.SECONDS);
193     }
194
195     public void cleanup(DistributedDataStore dataStore) {
196         if(dataStore != null) {
197             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
198         }
199     }
200
201     void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
202             throws Exception {
203         try {
204             callable.call();
205             fail("Expected " + expType.getSimpleName());
206         } catch(Exception e) {
207             assertEquals("Exception type", expType, e.getClass());
208         }
209     }
210
211     void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
212             Class<? extends Exception> expType) throws Exception {
213         assertExceptionOnCall(new Callable<Void>() {
214             @Override
215             public Void call() throws Exception {
216                 txChain.newWriteOnlyTransaction();
217                 return null;
218             }
219         }, expType);
220
221         assertExceptionOnCall(new Callable<Void>() {
222             @Override
223             public Void call() throws Exception {
224                 txChain.newReadWriteTransaction();
225                 return null;
226             }
227         }, expType);
228
229         assertExceptionOnCall(new Callable<Void>() {
230             @Override
231             public Void call() throws Exception {
232                 txChain.newReadOnlyTransaction();
233                 return null;
234             }
235         }, expType);
236     }
237
238     public interface ShardStatsVerifier {
239         void verify(ShardStats stats);
240     }
241 }