1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.Uninterruptibles;
8 import static org.junit.Assert.assertEquals;
9 import static org.junit.Assert.assertNotNull;
10 import org.junit.Test;
11 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
12 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
13 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
14 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
15 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
16 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
17 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
19 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
24 import org.opendaylight.yangtools.concepts.ListenerRegistration;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
27 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
28 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
29 import java.util.concurrent.TimeUnit;
31 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
34 public void testWriteTransactionWithSingleShard() throws Exception{
35 System.setProperty("shard.persistent", "true");
36 new IntegrationTestKit(getSystem()) {{
37 DistributedDataStore dataStore =
38 setupDistributedDataStore("transactionIntegrationTest", "test-1");
40 testWriteTransaction(dataStore, TestModel.TEST_PATH,
41 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
43 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
44 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
51 public void testWriteTransactionWithMultipleShards() throws Exception{
52 System.setProperty("shard.persistent", "true");
53 new IntegrationTestKit(getSystem()) {{
54 DistributedDataStore dataStore =
55 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
57 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
58 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
60 YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
61 NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
62 writeTx.write(nodePath1, nodeToWrite1);
64 YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
65 NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
66 writeTx.write(nodePath2, nodeToWrite2);
68 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
70 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
71 assertEquals("canCommit", true, canCommit);
72 cohort.preCommit().get(5, TimeUnit.SECONDS);
73 cohort.commit().get(5, TimeUnit.SECONDS);
75 // 5. Verify the data in the store
77 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
79 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
80 assertEquals("isPresent", true, optional.isPresent());
81 assertEquals("Data node", nodeToWrite1, optional.get());
83 optional = readTx.read(nodePath2).get();
84 assertEquals("isPresent", true, optional.isPresent());
85 assertEquals("Data node", nodeToWrite2, optional.get());
92 public void testReadWriteTransaction() throws Exception{
93 System.setProperty("shard.persistent", "true");
94 new IntegrationTestKit(getSystem()) {{
95 DistributedDataStore dataStore =
96 setupDistributedDataStore("testReadWriteTransaction", "test-1");
98 // 1. Create a read-write Tx
100 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
101 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
103 // 2. Write some data
105 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
106 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
107 readWriteTx.write(nodePath, nodeToWrite );
109 // 3. Read the data from Tx
111 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
112 assertEquals("exists", true, exists);
114 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
115 assertEquals("isPresent", true, optional.isPresent());
116 assertEquals("Data node", nodeToWrite, optional.get());
118 // 4. Ready the Tx for commit
120 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
124 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
125 assertEquals("canCommit", true, canCommit);
126 cohort.preCommit().get(5, TimeUnit.SECONDS);
127 cohort.commit().get(5, TimeUnit.SECONDS);
129 // 6. Verify the data in the store
131 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
133 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
134 assertEquals("isPresent", true, optional.isPresent());
135 assertEquals("Data node", nodeToWrite, optional.get());
142 public void testTransactionAbort() throws Exception{
143 System.setProperty("shard.persistent", "true");
144 new IntegrationTestKit(getSystem()) {{
145 DistributedDataStore dataStore =
146 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
148 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
149 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
151 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
153 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
155 cohort.canCommit().get(5, TimeUnit.SECONDS);
157 cohort.abort().get(5, TimeUnit.SECONDS);
159 testWriteTransaction(dataStore, TestModel.TEST_PATH,
160 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
167 public void testTransactionChain() throws Exception{
168 System.setProperty("shard.persistent", "true");
169 new IntegrationTestKit(getSystem()) {{
170 DistributedDataStore dataStore =
171 setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
173 // 1. Create a Tx chain and write-only Tx
175 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
177 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
178 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
180 // 2. Write some data
182 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
183 writeTx.write(TestModel.TEST_PATH, containerNode);
185 // 3. Ready the Tx for commit
187 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
191 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
192 assertEquals("canCommit", true, canCommit);
193 cohort.preCommit().get(5, TimeUnit.SECONDS);
194 cohort.commit().get(5, TimeUnit.SECONDS);
196 // 5. Verify the data in the store
198 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
200 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
201 assertEquals("isPresent", true, optional.isPresent());
202 assertEquals("Data node", containerNode, optional.get());
211 public void testChangeListenerRegistration() throws Exception{
212 new IntegrationTestKit(getSystem()) {{
213 DistributedDataStore dataStore =
214 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
216 MockDataChangeListener listener = new MockDataChangeListener(3);
218 ListenerRegistration<MockDataChangeListener>
219 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
220 DataChangeScope.SUBTREE);
222 assertNotNull("registerChangeListener returned null", listenerReg);
224 testWriteTransaction(dataStore, TestModel.TEST_PATH,
225 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
227 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
228 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
230 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
231 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
232 testWriteTransaction(dataStore, listPath,
233 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
235 listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
239 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
240 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
241 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
243 listener.expectNoMoreChanges("Received unexpected change after close");
249 class IntegrationTestKit extends ShardTestKit {
251 IntegrationTestKit(ActorSystem actorSystem) {
255 DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
256 MockClusterWrapper cluster = new MockClusterWrapper();
257 Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
258 ShardStrategyFactory.setConfiguration(config);
260 DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
261 DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
262 config, datastoreContext);
264 SchemaContext schemaContext = SchemaContextHelper.full();
265 dataStore.onGlobalContextUpdated(schemaContext);
267 for(String shardName: shardNames) {
268 ActorRef shard = null;
269 for(int i = 0; i < 20 * 5 && shard == null; i++) {
270 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
271 Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
272 if(shardReply.isPresent()) {
273 shard = shardReply.get();
277 assertNotNull("Shard was not created", shard);
279 System.out.println("!!!!!!shard: "+shard.path().toString());
280 waitUntilLeader(shard);
286 void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
287 NormalizedNode<?, ?> nodeToWrite) throws Exception {
289 // 1. Create a write-only Tx
291 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
292 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
294 // 2. Write some data
296 writeTx.write(nodePath, nodeToWrite);
298 // 3. Ready the Tx for commit
300 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
304 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
305 assertEquals("canCommit", true, canCommit);
306 cohort.preCommit().get(5, TimeUnit.SECONDS);
307 cohort.commit().get(5, TimeUnit.SECONDS);
309 // 5. Verify the data in the store
311 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
313 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
314 assertEquals("isPresent", true, optional.isPresent());
315 assertEquals("Data node", nodeToWrite, optional.get());
318 void cleanup(DistributedDataStore dataStore) {
319 dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);