Merge "Small fix to xsql dependencies"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorSystem;
4 import akka.event.Logging;
5 import akka.testkit.JavaTestKit;
6
7 import com.google.common.base.Optional;
8 import com.google.common.util.concurrent.ListenableFuture;
9
10 import junit.framework.Assert;
11
12 import org.apache.commons.io.FileUtils;
13 import org.junit.After;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
17 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
18 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
19 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
20 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
21 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
24 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
25 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
26
27 import java.io.File;
28 import java.io.IOException;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32
33 import static junit.framework.Assert.assertEquals;
34 import static junit.framework.Assert.assertTrue;
35 import static junit.framework.Assert.fail;
36
37 public class DistributedDataStoreIntegrationTest {
38
39     private static ActorSystem system;
40
41     @Before
42     public void setUp() throws IOException {
43         File journal = new File("journal");
44
45         if(journal.exists()) {
46             FileUtils.deleteDirectory(journal);
47         }
48
49
50         System.setProperty("shard.persistent", "false");
51         system = ActorSystem.create("test");
52     }
53
54     @After
55     public void tearDown() {
56         JavaTestKit.shutdownActorSystem(system);
57         system = null;
58     }
59
60     protected ActorSystem getSystem() {
61         return system;
62     }
63
64     @Test
65     public void integrationTest() throws Exception {
66         final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
67         ShardStrategyFactory.setConfiguration(configuration);
68
69
70
71         new JavaTestKit(getSystem()) {
72             {
73
74                 new Within(duration("10 seconds")) {
75                     @Override
76                     protected void run() {
77                         try {
78                             final DistributedDataStore distributedDataStore =
79                                 new DistributedDataStore(getSystem(), "config",
80                                         new MockClusterWrapper(), configuration,
81                                         new DistributedDataStoreProperties());
82
83                             distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
84
85                             // Wait for a specific log message to show up
86                             final boolean result =
87                                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
88                                     ) {
89                                     @Override
90                                     protected Boolean run() {
91                                         return true;
92                                     }
93                                 }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
94                                     .message("Switching from state Candidate to Leader")
95                                     .occurrences(1).exec();
96
97                             assertEquals(true, result);
98
99                             DOMStoreReadWriteTransaction transaction =
100                                 distributedDataStore.newReadWriteTransaction();
101
102                             transaction
103                                 .write(TestModel.TEST_PATH, ImmutableNodes
104                                     .containerNode(TestModel.TEST_QNAME));
105
106                             ListenableFuture<Optional<NormalizedNode<?, ?>>>
107                                 future =
108                                 transaction.read(TestModel.TEST_PATH);
109
110                             Optional<NormalizedNode<?, ?>> optional =
111                                 future.get();
112
113                             Assert.assertTrue("Node not found", optional.isPresent());
114
115                             NormalizedNode<?, ?> normalizedNode =
116                                 optional.get();
117
118                             assertEquals(TestModel.TEST_QNAME,
119                                 normalizedNode.getNodeType());
120
121                             DOMStoreThreePhaseCommitCohort ready =
122                                 transaction.ready();
123
124                             ListenableFuture<Boolean> canCommit =
125                                 ready.canCommit();
126
127                             assertTrue(canCommit.get(5, TimeUnit.SECONDS));
128
129                             ListenableFuture<Void> preCommit =
130                                 ready.preCommit();
131
132                             preCommit.get(5, TimeUnit.SECONDS);
133
134                             ListenableFuture<Void> commit = ready.commit();
135
136                             commit.get(5, TimeUnit.SECONDS);
137                         } catch (ExecutionException | TimeoutException | InterruptedException e){
138                             fail(e.getMessage());
139                         }
140                     }
141                 };
142             }
143         };
144
145     }
146
147
148     //FIXME : Disabling test because it's flaky
149     //@Test
150     public void integrationTestWithMultiShardConfiguration()
151         throws ExecutionException, InterruptedException, TimeoutException {
152         final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
153
154         ShardStrategyFactory.setConfiguration(configuration);
155
156         new JavaTestKit(getSystem()) {
157             {
158
159                 new Within(duration("10 seconds")) {
160                     @Override
161                     protected void run() {
162                         try {
163                             final DistributedDataStore distributedDataStore =
164                                 new DistributedDataStore(getSystem(), "config",
165                                     new MockClusterWrapper(), configuration, null);
166
167                             distributedDataStore.onGlobalContextUpdated(
168                                 SchemaContextHelper.full());
169
170                             // Wait for a specific log message to show up
171                             final boolean result =
172                                 new JavaTestKit.EventFilter<Boolean>(
173                                     Logging.Info.class
174                                 ) {
175                                     @Override
176                                     protected Boolean run() {
177                                         return true;
178                                     }
179                                 }.from(
180                                     "akka://test/user/shardmanager-config/member-1-shard-cars-1-config")
181                                     .message(
182                                         "Switching from state Candidate to Leader")
183                                     .occurrences(1)
184                                     .exec();
185
186                             Thread.sleep(1000);
187
188
189                             DOMStoreReadWriteTransaction transaction =
190                                 distributedDataStore.newReadWriteTransaction();
191
192                             transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
193                             transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
194
195                             DOMStoreThreePhaseCommitCohort ready = transaction.ready();
196
197                             ListenableFuture<Boolean> canCommit = ready.canCommit();
198
199                             assertTrue(canCommit.get(5, TimeUnit.SECONDS));
200
201                             ListenableFuture<Void> preCommit = ready.preCommit();
202
203                             preCommit.get(5, TimeUnit.SECONDS);
204
205                             ListenableFuture<Void> commit = ready.commit();
206
207                             commit.get(5, TimeUnit.SECONDS);
208
209                             assertEquals(true, result);
210                         } catch(ExecutionException | TimeoutException | InterruptedException e){
211                             fail(e.getMessage());
212                         }
213                     }
214                 };
215             }
216         };
217
218
219     }
220
221 }