1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.CheckedFuture;
8 import com.google.common.util.concurrent.Uninterruptibles;
10 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
11 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
12 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
13 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
14 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
15 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
16 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
17 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
18 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
19 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
21 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
27 import org.opendaylight.yangtools.concepts.ListenerRegistration;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
30 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
37 import static org.junit.Assert.assertEquals;
38 import static org.junit.Assert.assertNotNull;
40 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
42 private final DatastoreContext.Builder datastoreContextBuilder =
43 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
46 public void testWriteTransactionWithSingleShard() throws Exception{
47 new IntegrationTestKit(getSystem()) {{
48 DistributedDataStore dataStore =
49 setupDistributedDataStore("transactionIntegrationTest", "test-1");
51 testWriteTransaction(dataStore, TestModel.TEST_PATH,
52 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
54 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
55 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
62 public void testWriteTransactionWithMultipleShards() throws Exception{
63 new IntegrationTestKit(getSystem()) {{
64 DistributedDataStore dataStore =
65 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
67 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
68 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
70 YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
71 NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
72 writeTx.write(nodePath1, nodeToWrite1);
74 YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
75 NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
76 writeTx.write(nodePath2, nodeToWrite2);
78 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
80 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
81 assertEquals("canCommit", true, canCommit);
82 cohort.preCommit().get(5, TimeUnit.SECONDS);
83 cohort.commit().get(5, TimeUnit.SECONDS);
85 // Verify the data in the store
87 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
89 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
90 assertEquals("isPresent", true, optional.isPresent());
91 assertEquals("Data node", nodeToWrite1, optional.get());
93 optional = readTx.read(nodePath2).get();
94 assertEquals("isPresent", true, optional.isPresent());
95 assertEquals("Data node", nodeToWrite2, optional.get());
102 public void testReadWriteTransaction() throws Exception{
103 System.setProperty("shard.persistent", "true");
104 new IntegrationTestKit(getSystem()) {{
105 DistributedDataStore dataStore =
106 setupDistributedDataStore("testReadWriteTransaction", "test-1");
108 // 1. Create a read-write Tx
110 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
111 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
113 // 2. Write some data
115 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
116 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
117 readWriteTx.write(nodePath, nodeToWrite );
119 // 3. Read the data from Tx
121 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
122 assertEquals("exists", true, exists);
124 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
125 assertEquals("isPresent", true, optional.isPresent());
126 assertEquals("Data node", nodeToWrite, optional.get());
128 // 4. Ready the Tx for commit
130 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
134 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
135 assertEquals("canCommit", true, canCommit);
136 cohort.preCommit().get(5, TimeUnit.SECONDS);
137 cohort.commit().get(5, TimeUnit.SECONDS);
139 // 6. Verify the data in the store
141 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
143 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
144 assertEquals("isPresent", true, optional.isPresent());
145 assertEquals("Data node", nodeToWrite, optional.get());
152 public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
153 new IntegrationTestKit(getSystem()) {{
154 String testName = "testTransactionWritesWithShardNotInitiallyReady";
155 String shardName = "test-1";
157 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
158 // initialized until we create and submit the write the Tx.
159 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
160 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
161 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
163 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
165 // Create the write Tx
167 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
168 assertNotNull("newReadWriteTransaction returned null", writeTx);
170 // Do some modification operations and ready the Tx on a separate thread.
172 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
173 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
174 TestModel.ID_QNAME, 1).build();
176 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
177 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
178 final CountDownLatch txReady = new CountDownLatch(1);
179 Thread txThread = new Thread() {
183 writeTx.write(TestModel.TEST_PATH,
184 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
186 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
187 TestModel.OUTER_LIST_QNAME).build());
189 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
190 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
192 writeTx.delete(listEntryPath);
194 txCohort.set(writeTx.ready());
195 } catch(Exception e) {
206 // Wait for the Tx operations to complete.
208 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
209 if(caughtEx.get() != null) {
210 throw caughtEx.get();
213 assertEquals("Tx ready", true, done);
215 // At this point the Tx operations should be waiting for the shard to initialize so
216 // trigger the latch to let the shard recovery to continue.
218 blockRecoveryLatch.countDown();
220 // Wait for the Tx commit to complete.
222 assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
223 txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
224 txCohort.get().commit().get(5, TimeUnit.SECONDS);
226 // Verify the data in the store
228 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
230 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
231 get(5, TimeUnit.SECONDS);
232 assertEquals("isPresent", true, optional.isPresent());
234 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
235 assertEquals("isPresent", true, optional.isPresent());
237 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
238 assertEquals("isPresent", false, optional.isPresent());
245 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
246 new IntegrationTestKit(getSystem()) {{
247 String testName = "testTransactionReadsWithShardNotInitiallyReady";
248 String shardName = "test-1";
250 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
251 // initialized until we create the Tx.
252 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
253 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
254 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
256 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
258 // Create the read-write Tx
260 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
261 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
263 // Do some reads on the Tx on a separate thread.
265 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
266 new AtomicReference<>();
267 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
268 txReadFuture = new AtomicReference<>();
269 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
270 final CountDownLatch txReadsDone = new CountDownLatch(1);
271 Thread txThread = new Thread() {
275 readWriteTx.write(TestModel.TEST_PATH,
276 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
278 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
280 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
281 } catch(Exception e) {
285 txReadsDone.countDown();
292 // Wait for the Tx operations to complete.
294 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
295 if(caughtEx.get() != null) {
296 throw caughtEx.get();
299 assertEquals("Tx reads done", true, done);
301 // At this point the Tx operations should be waiting for the shard to initialize so
302 // trigger the latch to let the shard recovery to continue.
304 blockRecoveryLatch.countDown();
306 // Wait for the reads to complete and verify.
308 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
309 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
317 @Test(expected=NotInitializedException.class)
318 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
319 new IntegrationTestKit(getSystem()) {{
320 String testName = "testTransactionCommitFailureWithShardNotInitialized";
321 String shardName = "test-1";
323 // Set the shard initialization timeout low for the test.
325 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
327 // Setup the InMemoryJournal to block shard recovery indefinitely.
329 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
330 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
331 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
333 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
335 // Create the write Tx
337 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
338 assertNotNull("newReadWriteTransaction returned null", writeTx);
340 // Do some modifications and ready the Tx on a separate thread.
342 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
343 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
344 final CountDownLatch txReady = new CountDownLatch(1);
345 Thread txThread = new Thread() {
349 writeTx.write(TestModel.TEST_PATH,
350 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
352 txCohort.set(writeTx.ready());
353 } catch(Exception e) {
364 // Wait for the Tx operations to complete.
366 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
367 if(caughtEx.get() != null) {
368 throw caughtEx.get();
371 assertEquals("Tx ready", true, done);
373 // Wait for the commit to complete. Since the shard never initialized, the Tx should
374 // have timed out and throw an appropriate exception cause.
377 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
378 } catch(ExecutionException e) {
381 blockRecoveryLatch.countDown();
387 @Test(expected=NotInitializedException.class)
388 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
389 new IntegrationTestKit(getSystem()) {{
390 String testName = "testTransactionReadFailureWithShardNotInitialized";
391 String shardName = "test-1";
393 // Set the shard initialization timeout low for the test.
395 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
397 // Setup the InMemoryJournal to block shard recovery indefinitely.
399 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
400 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
401 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
403 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
405 // Create the read-write Tx
407 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
408 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
410 // Do a read on the Tx on a separate thread.
412 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
413 txReadFuture = new AtomicReference<>();
414 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
415 final CountDownLatch txReadDone = new CountDownLatch(1);
416 Thread txThread = new Thread() {
420 readWriteTx.write(TestModel.TEST_PATH,
421 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
423 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
426 } catch(Exception e) {
430 txReadDone.countDown();
437 // Wait for the Tx operations to complete.
439 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
440 if(caughtEx.get() != null) {
441 throw caughtEx.get();
444 assertEquals("Tx read done", true, done);
446 // Wait for the read to complete. Since the shard never initialized, the Tx should
447 // have timed out and throw an appropriate exception cause.
450 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
451 } catch(ReadFailedException e) {
454 blockRecoveryLatch.countDown();
460 @Test(expected=NoShardLeaderException.class)
461 public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
462 new IntegrationTestKit(getSystem()) {{
463 String testName = "testTransactionCommitFailureWithNoShardLeader";
464 String shardName = "test-1";
466 // We don't want the shard to become the leader so prevent shard election from completing
467 // by setting the election timeout, which is based on the heartbeat interval, really high.
469 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
471 // Set the leader election timeout low for the test.
473 datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
475 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
477 // Create the write Tx.
479 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
480 assertNotNull("newReadWriteTransaction returned null", writeTx);
482 // Do some modifications and ready the Tx on a separate thread.
484 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
485 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
486 final CountDownLatch txReady = new CountDownLatch(1);
487 Thread txThread = new Thread() {
491 writeTx.write(TestModel.TEST_PATH,
492 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
494 txCohort.set(writeTx.ready());
495 } catch(Exception e) {
506 // Wait for the Tx operations to complete.
508 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
509 if(caughtEx.get() != null) {
510 throw caughtEx.get();
513 assertEquals("Tx ready", true, done);
515 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
516 // should have timed out and throw an appropriate exception cause.
519 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
520 } catch(ExecutionException e) {
529 public void testTransactionAbort() throws Exception{
530 System.setProperty("shard.persistent", "true");
531 new IntegrationTestKit(getSystem()) {{
532 DistributedDataStore dataStore =
533 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
535 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
536 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
538 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
540 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
542 cohort.canCommit().get(5, TimeUnit.SECONDS);
544 cohort.abort().get(5, TimeUnit.SECONDS);
546 testWriteTransaction(dataStore, TestModel.TEST_PATH,
547 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
554 public void testTransactionChain() throws Exception{
555 System.setProperty("shard.persistent", "true");
556 new IntegrationTestKit(getSystem()) {{
557 DistributedDataStore dataStore =
558 setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
560 // 1. Create a Tx chain and write-only Tx
562 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
564 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
565 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
567 // 2. Write some data
569 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
570 writeTx.write(TestModel.TEST_PATH, containerNode);
572 // 3. Ready the Tx for commit
574 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
578 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
579 assertEquals("canCommit", true, canCommit);
580 cohort.preCommit().get(5, TimeUnit.SECONDS);
581 cohort.commit().get(5, TimeUnit.SECONDS);
583 // 5. Verify the data in the store
585 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
587 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
588 assertEquals("isPresent", true, optional.isPresent());
589 assertEquals("Data node", containerNode, optional.get());
598 public void testChangeListenerRegistration() throws Exception{
599 new IntegrationTestKit(getSystem()) {{
600 DistributedDataStore dataStore =
601 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
603 MockDataChangeListener listener = new MockDataChangeListener(3);
605 ListenerRegistration<MockDataChangeListener>
606 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
607 DataChangeScope.SUBTREE);
609 assertNotNull("registerChangeListener returned null", listenerReg);
611 testWriteTransaction(dataStore, TestModel.TEST_PATH,
612 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
614 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
615 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
617 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
618 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
619 testWriteTransaction(dataStore, listPath,
620 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
622 listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
626 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
627 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
628 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
630 listener.expectNoMoreChanges("Received unexpected change after close");
636 class IntegrationTestKit extends ShardTestKit {
638 IntegrationTestKit(ActorSystem actorSystem) {
642 DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
643 return setupDistributedDataStore(typeName, true, shardNames);
646 DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
647 String... shardNames) {
648 MockClusterWrapper cluster = new MockClusterWrapper();
649 Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
650 ShardStrategyFactory.setConfiguration(config);
652 DatastoreContext datastoreContext = datastoreContextBuilder.build();
653 DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
654 config, datastoreContext);
656 SchemaContext schemaContext = SchemaContextHelper.full();
657 dataStore.onGlobalContextUpdated(schemaContext);
659 if(waitUntilLeader) {
660 for(String shardName: shardNames) {
661 ActorRef shard = null;
662 for(int i = 0; i < 20 * 5 && shard == null; i++) {
663 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
664 Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
665 if(shardReply.isPresent()) {
666 shard = shardReply.get();
670 assertNotNull("Shard was not created", shard);
672 waitUntilLeader(shard);
679 void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
680 NormalizedNode<?, ?> nodeToWrite) throws Exception {
682 // 1. Create a write-only Tx
684 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
685 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
687 // 2. Write some data
689 writeTx.write(nodePath, nodeToWrite);
691 // 3. Ready the Tx for commit
693 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
697 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
698 assertEquals("canCommit", true, canCommit);
699 cohort.preCommit().get(5, TimeUnit.SECONDS);
700 cohort.commit().get(5, TimeUnit.SECONDS);
702 // 5. Verify the data in the store
704 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
706 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
707 assertEquals("isPresent", true, optional.isPresent());
708 assertEquals("Data node", nodeToWrite, optional.get());
711 void cleanup(DistributedDataStore dataStore) {
712 dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);