BUG 1712 - Distributed DataStore does not work properly with Transaction Chains
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorSystem;
4 import akka.event.Logging;
5 import akka.testkit.JavaTestKit;
6
7 import com.google.common.base.Optional;
8 import com.google.common.util.concurrent.ListenableFuture;
9
10 import junit.framework.Assert;
11
12 import org.apache.commons.io.FileUtils;
13 import org.junit.After;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
17 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
18 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
19 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
20 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
21 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
25 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
26 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
27
28 import java.io.File;
29 import java.io.IOException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34 import static junit.framework.Assert.assertEquals;
35 import static junit.framework.Assert.assertTrue;
36 import static junit.framework.Assert.fail;
37
38 public class DistributedDataStoreIntegrationTest {
39
40     private static ActorSystem system;
41
42     @Before
43     public void setUp() throws IOException {
44         File journal = new File("journal");
45
46         if(journal.exists()) {
47             FileUtils.deleteDirectory(journal);
48         }
49
50
51         System.setProperty("shard.persistent", "false");
52         system = ActorSystem.create("test");
53     }
54
55     @After
56     public void tearDown() {
57         JavaTestKit.shutdownActorSystem(system);
58         system = null;
59     }
60
61     protected ActorSystem getSystem() {
62         return system;
63     }
64
65     @Test
66     public void integrationTest() throws Exception {
67         final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
68         ShardStrategyFactory.setConfiguration(configuration);
69
70
71
72         new JavaTestKit(getSystem()) {
73             {
74
75                 new Within(duration("10 seconds")) {
76                     @Override
77                     protected void run() {
78                         try {
79                             final DistributedDataStore distributedDataStore =
80                                 new DistributedDataStore(getSystem(), "config",
81                                         new MockClusterWrapper(), configuration,
82                                         new DatastoreContext());
83
84                             distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
85
86                             // Wait for a specific log message to show up
87                             final boolean result =
88                                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
89                                     ) {
90                                     @Override
91                                     protected Boolean run() {
92                                         return true;
93                                     }
94                                 }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
95                                     .message("Switching from state Candidate to Leader")
96                                     .occurrences(1).exec();
97
98                             assertEquals(true, result);
99
100                             DOMStoreReadWriteTransaction transaction =
101                                 distributedDataStore.newReadWriteTransaction();
102
103                             transaction
104                                 .write(TestModel.TEST_PATH, ImmutableNodes
105                                     .containerNode(TestModel.TEST_QNAME));
106
107                             ListenableFuture<Optional<NormalizedNode<?, ?>>>
108                                 future =
109                                 transaction.read(TestModel.TEST_PATH);
110
111                             Optional<NormalizedNode<?, ?>> optional =
112                                 future.get();
113
114                             Assert.assertTrue("Node not found", optional.isPresent());
115
116                             NormalizedNode<?, ?> normalizedNode =
117                                 optional.get();
118
119                             assertEquals(TestModel.TEST_QNAME,
120                                 normalizedNode.getNodeType());
121
122                             DOMStoreThreePhaseCommitCohort ready =
123                                 transaction.ready();
124
125                             ListenableFuture<Boolean> canCommit =
126                                 ready.canCommit();
127
128                             assertTrue(canCommit.get(5, TimeUnit.SECONDS));
129
130                             ListenableFuture<Void> preCommit =
131                                 ready.preCommit();
132
133                             preCommit.get(5, TimeUnit.SECONDS);
134
135                             ListenableFuture<Void> commit = ready.commit();
136
137                             commit.get(5, TimeUnit.SECONDS);
138                         } catch (ExecutionException | TimeoutException | InterruptedException e){
139                             fail(e.getMessage());
140                         }
141                     }
142                 };
143             }
144         };
145
146     }
147
148     @Test
149     public void transactionChainIntegrationTest() throws Exception {
150         final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
151         ShardStrategyFactory.setConfiguration(configuration);
152
153
154
155         new JavaTestKit(getSystem()) {
156             {
157
158                 new Within(duration("10 seconds")) {
159                     @Override
160                     protected void run() {
161                         try {
162                             final DistributedDataStore distributedDataStore =
163                                 new DistributedDataStore(getSystem(), "config",
164                                     new MockClusterWrapper(), configuration,
165                                     new DatastoreContext());
166
167                             distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
168
169                             // Wait for a specific log message to show up
170                             final boolean result =
171                                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
172                                 ) {
173                                     @Override
174                                     protected Boolean run() {
175                                         return true;
176                                     }
177                                 }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
178                                     .message("Switching from state Candidate to Leader")
179                                     .occurrences(1).exec();
180
181                             assertEquals(true, result);
182
183                             DOMStoreTransactionChain transactionChain =
184                                 distributedDataStore.createTransactionChain();
185
186                             DOMStoreReadWriteTransaction transaction =
187                                 transactionChain.newReadWriteTransaction();
188
189                             transaction
190                                 .write(TestModel.TEST_PATH, ImmutableNodes
191                                     .containerNode(TestModel.TEST_QNAME));
192
193                             ListenableFuture<Optional<NormalizedNode<?, ?>>>
194                                 future =
195                                 transaction.read(TestModel.TEST_PATH);
196
197                             Optional<NormalizedNode<?, ?>> optional =
198                                 future.get();
199
200                             Assert.assertTrue("Node not found", optional.isPresent());
201
202                             NormalizedNode<?, ?> normalizedNode =
203                                 optional.get();
204
205                             assertEquals(TestModel.TEST_QNAME,
206                                 normalizedNode.getNodeType());
207
208                             DOMStoreThreePhaseCommitCohort ready =
209                                 transaction.ready();
210
211                             ListenableFuture<Boolean> canCommit =
212                                 ready.canCommit();
213
214                             assertTrue(canCommit.get(5, TimeUnit.SECONDS));
215
216                             ListenableFuture<Void> preCommit =
217                                 ready.preCommit();
218
219                             preCommit.get(5, TimeUnit.SECONDS);
220
221                             ListenableFuture<Void> commit = ready.commit();
222
223                             commit.get(5, TimeUnit.SECONDS);
224
225                             transactionChain.close();
226                         } catch (ExecutionException | TimeoutException | InterruptedException e){
227                             fail(e.getMessage());
228                         }
229                     }
230                 };
231             }
232         };
233
234     }
235
236
237     //FIXME : Disabling test because it's flaky
238     //@Test
239     public void integrationTestWithMultiShardConfiguration()
240         throws ExecutionException, InterruptedException, TimeoutException {
241         final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
242
243         ShardStrategyFactory.setConfiguration(configuration);
244
245         new JavaTestKit(getSystem()) {
246             {
247
248                 new Within(duration("10 seconds")) {
249                     @Override
250                     protected void run() {
251                         try {
252                             final DistributedDataStore distributedDataStore =
253                                 new DistributedDataStore(getSystem(), "config",
254                                     new MockClusterWrapper(), configuration, null);
255
256                             distributedDataStore.onGlobalContextUpdated(
257                                 SchemaContextHelper.full());
258
259                             // Wait for a specific log message to show up
260                             final boolean result =
261                                 new JavaTestKit.EventFilter<Boolean>(
262                                     Logging.Info.class
263                                 ) {
264                                     @Override
265                                     protected Boolean run() {
266                                         return true;
267                                     }
268                                 }.from(
269                                     "akka://test/user/shardmanager-config/member-1-shard-cars-1-config")
270                                     .message(
271                                         "Switching from state Candidate to Leader")
272                                     .occurrences(1)
273                                     .exec();
274
275                             Thread.sleep(1000);
276
277
278                             DOMStoreReadWriteTransaction transaction =
279                                 distributedDataStore.newReadWriteTransaction();
280
281                             transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
282                             transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
283
284                             DOMStoreThreePhaseCommitCohort ready = transaction.ready();
285
286                             ListenableFuture<Boolean> canCommit = ready.canCommit();
287
288                             assertTrue(canCommit.get(5, TimeUnit.SECONDS));
289
290                             ListenableFuture<Void> preCommit = ready.preCommit();
291
292                             preCommit.get(5, TimeUnit.SECONDS);
293
294                             ListenableFuture<Void> commit = ready.commit();
295
296                             commit.get(5, TimeUnit.SECONDS);
297
298                             assertEquals(true, result);
299                         } catch(ExecutionException | TimeoutException | InterruptedException e){
300                             fail(e.getMessage());
301                         }
302                     }
303                 };
304             }
305         };
306
307
308     }
309
310 }