1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.Uninterruptibles;
8 import static org.junit.Assert.assertEquals;
9 import static org.junit.Assert.assertNotNull;
10 import org.junit.Test;
11 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
12 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
13 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
14 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
15 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
16 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
17 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
19 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
24 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
25 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
26 import java.util.concurrent.TimeUnit;
28 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
31 public void testWriteTransactionWithSingleShard() throws Exception{
32 System.setProperty("shard.persistent", "true");
33 new IntegrationTestKit(getSystem()) {{
34 DistributedDataStore dataStore =
35 setupDistributedDataStore("transactionIntegrationTest", "test-1");
37 testWriteTransaction(dataStore, TestModel.TEST_PATH,
38 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
40 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
41 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
48 public void testWriteTransactionWithMultipleShards() throws Exception{
49 System.setProperty("shard.persistent", "true");
50 new IntegrationTestKit(getSystem()) {{
51 DistributedDataStore dataStore =
52 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
54 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
55 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
57 YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
58 NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
59 writeTx.write(nodePath1, nodeToWrite1);
61 YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
62 NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
63 writeTx.write(nodePath2, nodeToWrite2);
65 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
67 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
68 assertEquals("canCommit", true, canCommit);
69 cohort.preCommit().get(5, TimeUnit.SECONDS);
70 cohort.commit().get(5, TimeUnit.SECONDS);
72 // 5. Verify the data in the store
74 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
76 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
77 assertEquals("isPresent", true, optional.isPresent());
78 assertEquals("Data node", nodeToWrite1, optional.get());
80 optional = readTx.read(nodePath2).get();
81 assertEquals("isPresent", true, optional.isPresent());
82 assertEquals("Data node", nodeToWrite2, optional.get());
89 public void testReadWriteTransaction() throws Exception{
90 System.setProperty("shard.persistent", "true");
91 new IntegrationTestKit(getSystem()) {{
92 DistributedDataStore dataStore =
93 setupDistributedDataStore("testReadWriteTransaction", "test-1");
95 // 1. Create a read-write Tx
97 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
98 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
100 // 2. Write some data
102 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
103 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
104 readWriteTx.write(nodePath, nodeToWrite );
106 // 3. Read the data from Tx
108 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
109 assertEquals("exists", true, exists);
111 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
112 assertEquals("isPresent", true, optional.isPresent());
113 assertEquals("Data node", nodeToWrite, optional.get());
115 // 4. Ready the Tx for commit
117 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
121 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
122 assertEquals("canCommit", true, canCommit);
123 cohort.preCommit().get(5, TimeUnit.SECONDS);
124 cohort.commit().get(5, TimeUnit.SECONDS);
126 // 6. Verify the data in the store
128 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
130 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
131 assertEquals("isPresent", true, optional.isPresent());
132 assertEquals("Data node", nodeToWrite, optional.get());
139 public void testTransactionAbort() throws Exception{
140 System.setProperty("shard.persistent", "true");
141 new IntegrationTestKit(getSystem()) {{
142 DistributedDataStore dataStore =
143 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
145 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
146 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
148 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
150 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
152 cohort.canCommit().get(5, TimeUnit.SECONDS);
154 cohort.abort().get(5, TimeUnit.SECONDS);
156 testWriteTransaction(dataStore, TestModel.TEST_PATH,
157 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
164 public void testTransactionChain() throws Exception{
165 System.setProperty("shard.persistent", "true");
166 new IntegrationTestKit(getSystem()) {{
167 DistributedDataStore dataStore =
168 setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
170 // 1. Create a Tx chain and write-only Tx
172 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
174 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
175 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
177 // 2. Write some data
179 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
180 writeTx.write(TestModel.TEST_PATH, containerNode);
182 // 3. Ready the Tx for commit
184 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
188 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
189 assertEquals("canCommit", true, canCommit);
190 cohort.preCommit().get(5, TimeUnit.SECONDS);
191 cohort.commit().get(5, TimeUnit.SECONDS);
193 // 5. Verify the data in the store
195 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
197 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
198 assertEquals("isPresent", true, optional.isPresent());
199 assertEquals("Data node", containerNode, optional.get());
207 class IntegrationTestKit extends ShardTestKit {
209 IntegrationTestKit(ActorSystem actorSystem) {
213 DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
214 MockClusterWrapper cluster = new MockClusterWrapper();
215 Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
216 ShardStrategyFactory.setConfiguration(config);
218 DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
219 DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
220 config, datastoreContext);
222 SchemaContext schemaContext = SchemaContextHelper.full();
223 dataStore.onGlobalContextUpdated(schemaContext);
225 for(String shardName: shardNames) {
226 ActorRef shard = null;
227 for(int i = 0; i < 20 * 5 && shard == null; i++) {
228 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
229 Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
230 if(shardReply.isPresent()) {
231 shard = shardReply.get();
235 assertNotNull("Shard was not created", shard);
237 System.out.println("!!!!!!shard: "+shard.path().toString());
238 waitUntilLeader(shard);
244 void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
245 NormalizedNode<?, ?> nodeToWrite) throws Exception {
247 // 1. Create a write-only Tx
249 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
250 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
252 // 2. Write some data
254 writeTx.write(nodePath, nodeToWrite);
256 // 3. Ready the Tx for commit
258 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
262 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
263 assertEquals("canCommit", true, canCommit);
264 cohort.preCommit().get(5, TimeUnit.SECONDS);
265 cohort.commit().get(5, TimeUnit.SECONDS);
267 // 5. Verify the data in the store
269 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
271 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
272 assertEquals("isPresent", true, optional.isPresent());
273 assertEquals("Data node", nodeToWrite, optional.get());
276 void cleanup(DistributedDataStore dataStore) {
277 dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);