d35c36fb0a2c883649ec7fcccc42ac981f20e256
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import com.google.common.base.Optional;
7 import com.google.common.collect.Lists;
8 import com.google.common.util.concurrent.Uninterruptibles;
9 import static org.junit.Assert.assertEquals;
10 import static org.junit.Assert.assertNotNull;
11 import static org.junit.Assert.assertTrue;
12 import org.junit.Test;
13 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
14 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
15 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
16 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
17 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
18 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
27 import org.opendaylight.yangtools.concepts.ListenerRegistration;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
30 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
32 import java.util.List;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35
36 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
37
38     @Test
39     public void testWriteTransactionWithSingleShard() throws Exception{
40         System.setProperty("shard.persistent", "true");
41         new IntegrationTestKit(getSystem()) {{
42             DistributedDataStore dataStore =
43                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
44
45             testWriteTransaction(dataStore, TestModel.TEST_PATH,
46                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
47
48             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
49                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
50
51             cleanup(dataStore);
52         }};
53     }
54
55     @Test
56     public void testWriteTransactionWithMultipleShards() throws Exception{
57         System.setProperty("shard.persistent", "true");
58         new IntegrationTestKit(getSystem()) {{
59             DistributedDataStore dataStore =
60                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
61
62             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
63             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
64
65             YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
66             NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
67             writeTx.write(nodePath1, nodeToWrite1);
68
69             YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
70             NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
71             writeTx.write(nodePath2, nodeToWrite2);
72
73             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
74
75             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
76             assertEquals("canCommit", true, canCommit);
77             cohort.preCommit().get(5, TimeUnit.SECONDS);
78             cohort.commit().get(5, TimeUnit.SECONDS);
79
80             // 5. Verify the data in the store
81
82             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
83
84             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
85             assertEquals("isPresent", true, optional.isPresent());
86             assertEquals("Data node", nodeToWrite1, optional.get());
87
88             optional = readTx.read(nodePath2).get();
89             assertEquals("isPresent", true, optional.isPresent());
90             assertEquals("Data node", nodeToWrite2, optional.get());
91
92             cleanup(dataStore);
93         }};
94     }
95
96     @Test
97     public void testReadWriteTransaction() throws Exception{
98         System.setProperty("shard.persistent", "true");
99         new IntegrationTestKit(getSystem()) {{
100             DistributedDataStore dataStore =
101                     setupDistributedDataStore("testReadWriteTransaction", "test-1");
102
103          // 1. Create a read-write Tx
104
105             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
106             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
107
108             // 2. Write some data
109
110             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
111             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
112             readWriteTx.write(nodePath, nodeToWrite );
113
114             // 3. Read the data from Tx
115
116             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
117             assertEquals("exists", true, exists);
118
119             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
120             assertEquals("isPresent", true, optional.isPresent());
121             assertEquals("Data node", nodeToWrite, optional.get());
122
123             // 4. Ready the Tx for commit
124
125             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
126
127             // 5. Commit the Tx
128
129             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
130             assertEquals("canCommit", true, canCommit);
131             cohort.preCommit().get(5, TimeUnit.SECONDS);
132             cohort.commit().get(5, TimeUnit.SECONDS);
133
134             // 6. Verify the data in the store
135
136             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
137
138             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
139             assertEquals("isPresent", true, optional.isPresent());
140             assertEquals("Data node", nodeToWrite, optional.get());
141
142             cleanup(dataStore);
143         }};
144     }
145
146     @Test
147     public void testTransactionAbort() throws Exception{
148         System.setProperty("shard.persistent", "true");
149         new IntegrationTestKit(getSystem()) {{
150             DistributedDataStore dataStore =
151                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
152
153             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
154             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
155
156             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
157
158             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
159
160             cohort.canCommit().get(5, TimeUnit.SECONDS);
161
162             cohort.abort().get(5, TimeUnit.SECONDS);
163
164             testWriteTransaction(dataStore, TestModel.TEST_PATH,
165                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
166
167             cleanup(dataStore);
168         }};
169     }
170
171     @Test
172     public void testTransactionChain() throws Exception{
173         System.setProperty("shard.persistent", "true");
174         new IntegrationTestKit(getSystem()) {{
175             DistributedDataStore dataStore =
176                     setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
177
178             // 1. Create a Tx chain and write-only Tx
179
180             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
181
182             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
183             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
184
185             // 2. Write some data
186
187             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
188             writeTx.write(TestModel.TEST_PATH, containerNode);
189
190             // 3. Ready the Tx for commit
191
192             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
193
194             // 4. Commit the Tx
195
196             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
197             assertEquals("canCommit", true, canCommit);
198             cohort.preCommit().get(5, TimeUnit.SECONDS);
199             cohort.commit().get(5, TimeUnit.SECONDS);
200
201             // 5. Verify the data in the store
202
203             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
204
205             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
206             assertEquals("isPresent", true, optional.isPresent());
207             assertEquals("Data node", containerNode, optional.get());
208
209             txChain.close();
210
211             cleanup(dataStore);
212         }};
213     }
214
215     @Test
216     public void testChangeListenerRegistration() throws Exception{
217         new IntegrationTestKit(getSystem()) {{
218             DistributedDataStore dataStore =
219                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
220
221             final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
222                                                                 changeList = Lists.newArrayList();
223             final CountDownLatch changeLatch = new CountDownLatch(3);
224             AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
225                     new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
226                 @Override
227                 public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier,
228                                                                NormalizedNode<?, ?>> change) {
229                     changeList.add(change);
230                     changeLatch.countDown();
231                 }
232             };
233
234             ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
235                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
236                             DataChangeScope.SUBTREE);
237
238             assertNotNull("registerChangeListener returned null", listenerReg);
239
240             testWriteTransaction(dataStore, TestModel.TEST_PATH,
241                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
242
243             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
244                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
245
246             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
247                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
248             testWriteTransaction(dataStore, listPath,
249                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
250
251             assertEquals("Change notifications complete", true,
252                     Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
253
254             assertTrue("Change 1 does not contain " + TestModel.TEST_PATH,
255                     changeList.get(0).getCreatedData().containsKey(TestModel.TEST_PATH));
256
257             assertTrue("Change 2 does not contain " + TestModel.OUTER_LIST_PATH,
258                     changeList.get(1).getCreatedData().containsKey(TestModel.OUTER_LIST_PATH));
259
260             assertTrue("Change 3 does not contain " + listPath,
261                     changeList.get(2).getCreatedData().containsKey(listPath));
262
263             listenerReg.close();
264
265             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
266                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
267                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
268
269             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
270
271             assertEquals("Received unexpected change after close", 3, changeList.size());
272
273             cleanup(dataStore);
274         }};
275     }
276
277     class IntegrationTestKit extends ShardTestKit {
278
279         IntegrationTestKit(ActorSystem actorSystem) {
280             super(actorSystem);
281         }
282
283         DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
284             MockClusterWrapper cluster = new MockClusterWrapper();
285             Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
286             ShardStrategyFactory.setConfiguration(config);
287
288             DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
289             DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
290                     config, datastoreContext);
291
292             SchemaContext schemaContext = SchemaContextHelper.full();
293             dataStore.onGlobalContextUpdated(schemaContext);
294
295             for(String shardName: shardNames) {
296                 ActorRef shard = null;
297                 for(int i = 0; i < 20 * 5 && shard == null; i++) {
298                     Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
299                     Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
300                     if(shardReply.isPresent()) {
301                         shard = shardReply.get();
302                     }
303                 }
304
305                 assertNotNull("Shard was not created", shard);
306
307                 System.out.println("!!!!!!shard: "+shard.path().toString());
308                 waitUntilLeader(shard);
309             }
310
311             return dataStore;
312         }
313
314         void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
315                 NormalizedNode<?, ?> nodeToWrite) throws Exception {
316
317             // 1. Create a write-only Tx
318
319             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
320             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
321
322             // 2. Write some data
323
324             writeTx.write(nodePath, nodeToWrite);
325
326             // 3. Ready the Tx for commit
327
328             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
329
330             // 4. Commit the Tx
331
332             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
333             assertEquals("canCommit", true, canCommit);
334             cohort.preCommit().get(5, TimeUnit.SECONDS);
335             cohort.commit().get(5, TimeUnit.SECONDS);
336
337             // 5. Verify the data in the store
338
339             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
340
341             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
342             assertEquals("isPresent", true, optional.isPresent());
343             assertEquals("Data node", nodeToWrite, optional.get());
344         }
345
346         void cleanup(DistributedDataStore dataStore) {
347             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
348         }
349     }
350
351 }