1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorSystem;
4 import akka.event.Logging;
5 import akka.testkit.JavaTestKit;
7 import com.google.common.base.Optional;
8 import com.google.common.util.concurrent.ListenableFuture;
10 import junit.framework.Assert;
12 import org.apache.commons.io.FileUtils;
13 import org.junit.After;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
17 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
18 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
19 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
20 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
21 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
25 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
26 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
29 import java.io.IOException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
34 import static junit.framework.Assert.assertEquals;
35 import static junit.framework.Assert.assertTrue;
36 import static junit.framework.Assert.fail;
38 public class DistributedDataStoreIntegrationTest {
40 private static ActorSystem system;
43 public void setUp() throws IOException {
44 File journal = new File("journal");
46 if(journal.exists()) {
47 FileUtils.deleteDirectory(journal);
51 System.setProperty("shard.persistent", "false");
52 system = ActorSystem.create("test");
56 public void tearDown() {
57 JavaTestKit.shutdownActorSystem(system);
61 protected ActorSystem getSystem() {
66 public void integrationTest() throws Exception {
67 final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
68 ShardStrategyFactory.setConfiguration(configuration);
72 new JavaTestKit(getSystem()) {
75 new Within(duration("10 seconds")) {
77 protected void run() {
79 final DistributedDataStore distributedDataStore =
80 new DistributedDataStore(getSystem(), "config",
81 new MockClusterWrapper(), configuration,
82 new DatastoreContext());
84 distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
86 // Wait for a specific log message to show up
87 final boolean result =
88 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
91 protected Boolean run() {
94 }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
95 .message("Switching from state Candidate to Leader")
96 .occurrences(1).exec();
98 assertEquals(true, result);
100 DOMStoreReadWriteTransaction transaction =
101 distributedDataStore.newReadWriteTransaction();
104 .write(TestModel.TEST_PATH, ImmutableNodes
105 .containerNode(TestModel.TEST_QNAME));
107 ListenableFuture<Optional<NormalizedNode<?, ?>>>
109 transaction.read(TestModel.TEST_PATH);
111 Optional<NormalizedNode<?, ?>> optional =
114 Assert.assertTrue("Node not found", optional.isPresent());
116 NormalizedNode<?, ?> normalizedNode =
119 assertEquals(TestModel.TEST_QNAME,
120 normalizedNode.getNodeType());
122 DOMStoreThreePhaseCommitCohort ready =
125 ListenableFuture<Boolean> canCommit =
128 assertTrue(canCommit.get(5, TimeUnit.SECONDS));
130 ListenableFuture<Void> preCommit =
133 preCommit.get(5, TimeUnit.SECONDS);
135 ListenableFuture<Void> commit = ready.commit();
137 commit.get(5, TimeUnit.SECONDS);
138 } catch (ExecutionException | TimeoutException | InterruptedException e){
139 fail(e.getMessage());
149 public void transactionChainIntegrationTest() throws Exception {
150 final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
151 ShardStrategyFactory.setConfiguration(configuration);
155 new JavaTestKit(getSystem()) {
158 new Within(duration("10 seconds")) {
160 protected void run() {
162 final DistributedDataStore distributedDataStore =
163 new DistributedDataStore(getSystem(), "config",
164 new MockClusterWrapper(), configuration,
165 new DatastoreContext());
167 distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
169 // Wait for a specific log message to show up
170 final boolean result =
171 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
174 protected Boolean run() {
177 }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
178 .message("Switching from state Candidate to Leader")
179 .occurrences(1).exec();
181 assertEquals(true, result);
183 DOMStoreTransactionChain transactionChain =
184 distributedDataStore.createTransactionChain();
186 DOMStoreReadWriteTransaction transaction =
187 transactionChain.newReadWriteTransaction();
190 .write(TestModel.TEST_PATH, ImmutableNodes
191 .containerNode(TestModel.TEST_QNAME));
193 ListenableFuture<Optional<NormalizedNode<?, ?>>>
195 transaction.read(TestModel.TEST_PATH);
197 Optional<NormalizedNode<?, ?>> optional =
200 Assert.assertTrue("Node not found", optional.isPresent());
202 NormalizedNode<?, ?> normalizedNode =
205 assertEquals(TestModel.TEST_QNAME,
206 normalizedNode.getNodeType());
208 DOMStoreThreePhaseCommitCohort ready =
211 ListenableFuture<Boolean> canCommit =
214 assertTrue(canCommit.get(5, TimeUnit.SECONDS));
216 ListenableFuture<Void> preCommit =
219 preCommit.get(5, TimeUnit.SECONDS);
221 ListenableFuture<Void> commit = ready.commit();
223 commit.get(5, TimeUnit.SECONDS);
225 transactionChain.close();
226 } catch (ExecutionException | TimeoutException | InterruptedException e){
227 fail(e.getMessage());
237 //FIXME : Disabling test because it's flaky
239 public void integrationTestWithMultiShardConfiguration()
240 throws ExecutionException, InterruptedException, TimeoutException {
241 final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
243 ShardStrategyFactory.setConfiguration(configuration);
245 new JavaTestKit(getSystem()) {
248 new Within(duration("10 seconds")) {
250 protected void run() {
252 final DistributedDataStore distributedDataStore =
253 new DistributedDataStore(getSystem(), "config",
254 new MockClusterWrapper(), configuration, null);
256 distributedDataStore.onGlobalContextUpdated(
257 SchemaContextHelper.full());
259 // Wait for a specific log message to show up
260 final boolean result =
261 new JavaTestKit.EventFilter<Boolean>(
265 protected Boolean run() {
269 "akka://test/user/shardmanager-config/member-1-shard-cars-1-config")
271 "Switching from state Candidate to Leader")
278 DOMStoreReadWriteTransaction transaction =
279 distributedDataStore.newReadWriteTransaction();
281 transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
282 transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
284 DOMStoreThreePhaseCommitCohort ready = transaction.ready();
286 ListenableFuture<Boolean> canCommit = ready.canCommit();
288 assertTrue(canCommit.get(5, TimeUnit.SECONDS));
290 ListenableFuture<Void> preCommit = ready.preCommit();
292 preCommit.get(5, TimeUnit.SECONDS);
294 ListenableFuture<Void> commit = ready.commit();
296 commit.get(5, TimeUnit.SECONDS);
298 assertEquals(true, result);
299 } catch(ExecutionException | TimeoutException | InterruptedException e){
300 fail(e.getMessage());