1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import akka.actor.ActorRef;
6 import akka.actor.ActorSystem;
7 import akka.actor.PoisonPill;
8 import com.google.common.base.Optional;
9 import com.google.common.util.concurrent.CheckedFuture;
10 import com.google.common.util.concurrent.Uninterruptibles;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.atomic.AtomicReference;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
17 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
18 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
19 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
20 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
21 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
22 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
23 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
24 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
25 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
42 private final DatastoreContext.Builder datastoreContextBuilder =
43 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
46 public void testWriteTransactionWithSingleShard() throws Exception{
47 new IntegrationTestKit(getSystem()) {{
48 DistributedDataStore dataStore =
49 setupDistributedDataStore("transactionIntegrationTest", "test-1");
51 testWriteTransaction(dataStore, TestModel.TEST_PATH,
52 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
54 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
55 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
62 public void testWriteTransactionWithMultipleShards() throws Exception{
63 new IntegrationTestKit(getSystem()) {{
64 DistributedDataStore dataStore =
65 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
67 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
68 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
70 YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
71 NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
72 writeTx.write(nodePath1, nodeToWrite1);
74 YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
75 NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
76 writeTx.write(nodePath2, nodeToWrite2);
78 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
80 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
81 assertEquals("canCommit", true, canCommit);
82 cohort.preCommit().get(5, TimeUnit.SECONDS);
83 cohort.commit().get(5, TimeUnit.SECONDS);
85 // Verify the data in the store
87 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
89 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
90 assertEquals("isPresent", true, optional.isPresent());
91 assertEquals("Data node", nodeToWrite1, optional.get());
93 optional = readTx.read(nodePath2).get();
94 assertEquals("isPresent", true, optional.isPresent());
95 assertEquals("Data node", nodeToWrite2, optional.get());
102 public void testReadWriteTransaction() throws Exception{
103 System.setProperty("shard.persistent", "true");
104 new IntegrationTestKit(getSystem()) {{
105 DistributedDataStore dataStore =
106 setupDistributedDataStore("testReadWriteTransaction", "test-1");
108 // 1. Create a read-write Tx
110 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
111 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
113 // 2. Write some data
115 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
116 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
117 readWriteTx.write(nodePath, nodeToWrite );
119 // 3. Read the data from Tx
121 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
122 assertEquals("exists", true, exists);
124 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
125 assertEquals("isPresent", true, optional.isPresent());
126 assertEquals("Data node", nodeToWrite, optional.get());
128 // 4. Ready the Tx for commit
130 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
134 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
135 assertEquals("canCommit", true, canCommit);
136 cohort.preCommit().get(5, TimeUnit.SECONDS);
137 cohort.commit().get(5, TimeUnit.SECONDS);
139 // 6. Verify the data in the store
141 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
143 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
144 assertEquals("isPresent", true, optional.isPresent());
145 assertEquals("Data node", nodeToWrite, optional.get());
152 public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
153 new IntegrationTestKit(getSystem()) {{
154 String testName = "testTransactionWritesWithShardNotInitiallyReady";
155 String shardName = "test-1";
157 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
158 // initialized until we create and submit the write the Tx.
159 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
160 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
161 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
163 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
165 // Create the write Tx
167 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
168 assertNotNull("newReadWriteTransaction returned null", writeTx);
170 // Do some modification operations and ready the Tx on a separate thread.
172 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
173 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
174 TestModel.ID_QNAME, 1).build();
176 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
177 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
178 final CountDownLatch txReady = new CountDownLatch(1);
179 Thread txThread = new Thread() {
183 writeTx.write(TestModel.TEST_PATH,
184 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
186 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
187 TestModel.OUTER_LIST_QNAME).build());
189 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
190 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
192 writeTx.delete(listEntryPath);
194 txCohort.set(writeTx.ready());
195 } catch(Exception e) {
206 // Wait for the Tx operations to complete.
208 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
209 if(caughtEx.get() != null) {
210 throw caughtEx.get();
213 assertEquals("Tx ready", true, done);
215 // At this point the Tx operations should be waiting for the shard to initialize so
216 // trigger the latch to let the shard recovery to continue.
218 blockRecoveryLatch.countDown();
220 // Wait for the Tx commit to complete.
222 assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
223 txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
224 txCohort.get().commit().get(5, TimeUnit.SECONDS);
226 // Verify the data in the store
228 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
230 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
231 get(5, TimeUnit.SECONDS);
232 assertEquals("isPresent", true, optional.isPresent());
234 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
235 assertEquals("isPresent", true, optional.isPresent());
237 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
238 assertEquals("isPresent", false, optional.isPresent());
245 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
246 new IntegrationTestKit(getSystem()) {{
247 String testName = "testTransactionReadsWithShardNotInitiallyReady";
248 String shardName = "test-1";
250 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
251 // initialized until we create the Tx.
252 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
253 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
254 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
256 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
258 // Create the read-write Tx
260 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
261 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
263 // Do some reads on the Tx on a separate thread.
265 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
266 new AtomicReference<>();
267 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
268 txReadFuture = new AtomicReference<>();
269 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
270 final CountDownLatch txReadsDone = new CountDownLatch(1);
271 Thread txThread = new Thread() {
275 readWriteTx.write(TestModel.TEST_PATH,
276 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
278 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
280 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
281 } catch(Exception e) {
285 txReadsDone.countDown();
292 // Wait for the Tx operations to complete.
294 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
295 if(caughtEx.get() != null) {
296 throw caughtEx.get();
299 assertEquals("Tx reads done", true, done);
301 // At this point the Tx operations should be waiting for the shard to initialize so
302 // trigger the latch to let the shard recovery to continue.
304 blockRecoveryLatch.countDown();
306 // Wait for the reads to complete and verify.
308 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
309 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
317 @Test(expected=NotInitializedException.class)
318 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
319 new IntegrationTestKit(getSystem()) {{
320 String testName = "testTransactionCommitFailureWithShardNotInitialized";
321 String shardName = "test-1";
323 // Set the shard initialization timeout low for the test.
325 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
327 // Setup the InMemoryJournal to block shard recovery indefinitely.
329 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
330 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
331 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
333 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
335 // Create the write Tx
337 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
338 assertNotNull("newReadWriteTransaction returned null", writeTx);
340 // Do some modifications and ready the Tx on a separate thread.
342 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
343 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
344 final CountDownLatch txReady = new CountDownLatch(1);
345 Thread txThread = new Thread() {
349 writeTx.write(TestModel.TEST_PATH,
350 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
352 txCohort.set(writeTx.ready());
353 } catch(Exception e) {
364 // Wait for the Tx operations to complete.
366 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
367 if(caughtEx.get() != null) {
368 throw caughtEx.get();
371 assertEquals("Tx ready", true, done);
373 // Wait for the commit to complete. Since the shard never initialized, the Tx should
374 // have timed out and throw an appropriate exception cause.
377 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
378 } catch(ExecutionException e) {
381 blockRecoveryLatch.countDown();
387 @Test(expected=NotInitializedException.class)
388 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
389 new IntegrationTestKit(getSystem()) {{
390 String testName = "testTransactionReadFailureWithShardNotInitialized";
391 String shardName = "test-1";
393 // Set the shard initialization timeout low for the test.
395 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
397 // Setup the InMemoryJournal to block shard recovery indefinitely.
399 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
400 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
401 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
403 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
405 // Create the read-write Tx
407 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
408 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
410 // Do a read on the Tx on a separate thread.
412 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
413 txReadFuture = new AtomicReference<>();
414 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
415 final CountDownLatch txReadDone = new CountDownLatch(1);
416 Thread txThread = new Thread() {
420 readWriteTx.write(TestModel.TEST_PATH,
421 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
423 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
426 } catch(Exception e) {
430 txReadDone.countDown();
437 // Wait for the Tx operations to complete.
439 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
440 if(caughtEx.get() != null) {
441 throw caughtEx.get();
444 assertEquals("Tx read done", true, done);
446 // Wait for the read to complete. Since the shard never initialized, the Tx should
447 // have timed out and throw an appropriate exception cause.
450 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
451 } catch(ReadFailedException e) {
454 blockRecoveryLatch.countDown();
460 @Test(expected=NoShardLeaderException.class)
461 public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
462 new IntegrationTestKit(getSystem()) {{
463 String testName = "testTransactionCommitFailureWithNoShardLeader";
464 String shardName = "test-1";
466 // We don't want the shard to become the leader so prevent shard election from completing
467 // by setting the election timeout, which is based on the heartbeat interval, really high.
469 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
471 // Set the leader election timeout low for the test.
473 datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
475 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
477 // Create the write Tx.
479 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
480 assertNotNull("newReadWriteTransaction returned null", writeTx);
482 // Do some modifications and ready the Tx on a separate thread.
484 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
485 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
486 final CountDownLatch txReady = new CountDownLatch(1);
487 Thread txThread = new Thread() {
491 writeTx.write(TestModel.TEST_PATH,
492 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
494 txCohort.set(writeTx.ready());
495 } catch(Exception e) {
506 // Wait for the Tx operations to complete.
508 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
509 if(caughtEx.get() != null) {
510 throw caughtEx.get();
513 assertEquals("Tx ready", true, done);
515 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
516 // should have timed out and throw an appropriate exception cause.
519 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
520 } catch(ExecutionException e) {
529 public void testTransactionAbort() throws Exception{
530 System.setProperty("shard.persistent", "true");
531 new IntegrationTestKit(getSystem()) {{
532 DistributedDataStore dataStore =
533 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
535 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
536 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
538 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
540 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
542 cohort.canCommit().get(5, TimeUnit.SECONDS);
544 cohort.abort().get(5, TimeUnit.SECONDS);
546 testWriteTransaction(dataStore, TestModel.TEST_PATH,
547 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
554 public void testTransactionChain() throws Exception{
555 System.setProperty("shard.persistent", "true");
556 new IntegrationTestKit(getSystem()) {{
557 DistributedDataStore dataStore =
558 setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
560 // 1. Create a Tx chain and write-only Tx
562 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
564 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
565 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
567 // 2. Write some data
569 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
570 writeTx.write(TestModel.TEST_PATH, testNode);
572 // 3. Ready the Tx for commit
574 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
576 // 4. Commit the Tx on another thread that first waits for the second read Tx.
578 final CountDownLatch continueCommit1 = new CountDownLatch(1);
579 final CountDownLatch commit1Done = new CountDownLatch(1);
580 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
585 continueCommit1.await();
587 } catch (Exception e) {
590 commit1Done.countDown();
595 // 5. Create a new read Tx from the chain to read and verify the data from the first
596 // Tx is visible after being readied.
598 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
599 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
600 assertEquals("isPresent", true, optional.isPresent());
601 assertEquals("Data node", testNode, optional.get());
603 // 6. Create a new RW Tx from the chain, write more data, and ready it
605 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
606 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
607 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
609 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
611 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
612 // verify it is visible.
614 readTx = txChain.newReadOnlyTransaction();
615 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
616 assertEquals("isPresent", true, optional.isPresent());
617 assertEquals("Data node", outerNode, optional.get());
619 // 8. Wait for the 2 commits to complete and close the chain.
621 continueCommit1.countDown();
622 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
624 if(commit1Error.get() != null) {
625 throw commit1Error.get();
632 // 9. Create a new read Tx from the data store and verify committed data.
634 readTx = dataStore.newReadOnlyTransaction();
635 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
636 assertEquals("isPresent", true, optional.isPresent());
637 assertEquals("Data node", outerNode, optional.get());
642 private void doCommit(final DOMStoreThreePhaseCommitCohort cohort1) throws Exception {
643 Boolean canCommit = cohort1.canCommit().get(5, TimeUnit.SECONDS);
644 assertEquals("canCommit", true, canCommit);
645 cohort1.preCommit().get(5, TimeUnit.SECONDS);
646 cohort1.commit().get(5, TimeUnit.SECONDS);
651 public void testChangeListenerRegistration() throws Exception{
652 new IntegrationTestKit(getSystem()) {{
653 DistributedDataStore dataStore =
654 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
656 MockDataChangeListener listener = new MockDataChangeListener(3);
658 ListenerRegistration<MockDataChangeListener>
659 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
660 DataChangeScope.SUBTREE);
662 assertNotNull("registerChangeListener returned null", listenerReg);
664 testWriteTransaction(dataStore, TestModel.TEST_PATH,
665 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
667 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
668 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
670 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
671 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
672 testWriteTransaction(dataStore, listPath,
673 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
675 listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
679 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
680 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
681 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
683 listener.expectNoMoreChanges("Received unexpected change after close");
689 class IntegrationTestKit extends ShardTestKit {
691 IntegrationTestKit(ActorSystem actorSystem) {
695 DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
696 return setupDistributedDataStore(typeName, true, shardNames);
699 DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
700 String... shardNames) {
701 MockClusterWrapper cluster = new MockClusterWrapper();
702 Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
703 ShardStrategyFactory.setConfiguration(config);
705 DatastoreContext datastoreContext = datastoreContextBuilder.build();
706 DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
707 config, datastoreContext);
709 SchemaContext schemaContext = SchemaContextHelper.full();
710 dataStore.onGlobalContextUpdated(schemaContext);
712 if(waitUntilLeader) {
713 for(String shardName: shardNames) {
714 ActorRef shard = null;
715 for(int i = 0; i < 20 * 5 && shard == null; i++) {
716 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
717 Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
718 if(shardReply.isPresent()) {
719 shard = shardReply.get();
723 assertNotNull("Shard was not created", shard);
725 waitUntilLeader(shard);
732 void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
733 NormalizedNode<?, ?> nodeToWrite) throws Exception {
735 // 1. Create a write-only Tx
737 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
738 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
740 // 2. Write some data
742 writeTx.write(nodePath, nodeToWrite);
744 // 3. Ready the Tx for commit
746 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
750 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
751 assertEquals("canCommit", true, canCommit);
752 cohort.preCommit().get(5, TimeUnit.SECONDS);
753 cohort.commit().get(5, TimeUnit.SECONDS);
755 // 5. Verify the data in the store
757 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
759 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
760 assertEquals("isPresent", true, optional.isPresent());
761 assertEquals("Data node", nodeToWrite, optional.get());
764 void cleanup(DistributedDataStore dataStore) {
765 dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);