1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.eq;
9 import static org.mockito.Mockito.timeout;
10 import static org.mockito.Mockito.verify;
11 import akka.actor.ActorSystem;
12 import akka.actor.Address;
13 import akka.actor.AddressFromURIString;
14 import akka.cluster.Cluster;
15 import akka.testkit.JavaTestKit;
16 import com.google.common.base.Optional;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.typesafe.config.ConfigFactory;
23 import java.io.IOException;
24 import java.math.BigInteger;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.AfterClass;
33 import org.junit.BeforeClass;
34 import org.junit.Test;
35 import org.mockito.Mockito;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
38 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
39 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
40 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
41 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
42 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
43 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
46 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
47 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
48 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
49 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
50 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
51 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
52 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
53 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
54 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
57 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
58 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
59 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
60 import org.opendaylight.yangtools.concepts.ListenerRegistration;
61 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
62 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
63 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
64 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
65 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
66 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
67 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
69 public class DistributedDataStoreIntegrationTest {
71 private static ActorSystem system;
73 private final DatastoreContext.Builder datastoreContextBuilder =
74 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
77 public static void setUpClass() throws IOException {
78 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
79 Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
80 Cluster.get(system).join(member1Address);
84 public static void tearDownClass() throws IOException {
85 JavaTestKit.shutdownActorSystem(system);
89 protected ActorSystem getSystem() {
94 public void testWriteTransactionWithSingleShard() throws Exception{
95 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
96 DistributedDataStore dataStore =
97 setupDistributedDataStore("transactionIntegrationTest", "test-1");
99 testWriteTransaction(dataStore, TestModel.TEST_PATH,
100 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
102 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
103 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
110 public void testWriteTransactionWithMultipleShards() throws Exception{
111 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
112 DistributedDataStore dataStore =
113 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
115 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
116 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
118 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
119 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
121 doCommit(writeTx.ready());
123 writeTx = dataStore.newWriteOnlyTransaction();
125 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
126 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
128 doCommit(writeTx.ready());
130 writeTx = dataStore.newWriteOnlyTransaction();
132 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
133 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
134 writeTx.write(carPath, car);
136 MapEntryNode person = PeopleModel.newPersonEntry("jack");
137 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
138 writeTx.write(personPath, person);
140 doCommit(writeTx.ready());
142 // Verify the data in the store
144 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
146 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
147 assertEquals("isPresent", true, optional.isPresent());
148 assertEquals("Data node", car, optional.get());
150 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
151 assertEquals("isPresent", true, optional.isPresent());
152 assertEquals("Data node", person, optional.get());
159 public void testReadWriteTransactionWithSingleShard() throws Exception{
160 System.setProperty("shard.persistent", "true");
161 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
162 DistributedDataStore dataStore =
163 setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
165 // 1. Create a read-write Tx
167 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
168 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
170 // 2. Write some data
172 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
173 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
174 readWriteTx.write(nodePath, nodeToWrite );
176 // 3. Read the data from Tx
178 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
179 assertEquals("exists", true, exists);
181 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
182 assertEquals("isPresent", true, optional.isPresent());
183 assertEquals("Data node", nodeToWrite, optional.get());
185 // 4. Ready the Tx for commit
187 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
193 // 6. Verify the data in the store
195 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
197 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
198 assertEquals("isPresent", true, optional.isPresent());
199 assertEquals("Data node", nodeToWrite, optional.get());
206 public void testReadWriteTransactionWithMultipleShards() throws Exception{
207 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
208 DistributedDataStore dataStore =
209 setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
211 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
212 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
214 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
215 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
217 doCommit(readWriteTx.ready());
219 readWriteTx = dataStore.newReadWriteTransaction();
221 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
222 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
224 doCommit(readWriteTx.ready());
226 readWriteTx = dataStore.newReadWriteTransaction();
228 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
229 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
230 readWriteTx.write(carPath, car);
232 MapEntryNode person = PeopleModel.newPersonEntry("jack");
233 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
234 readWriteTx.write(personPath, person);
236 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
237 assertEquals("exists", true, exists);
239 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
240 assertEquals("isPresent", true, optional.isPresent());
241 assertEquals("Data node", car, optional.get());
243 doCommit(readWriteTx.ready());
245 // Verify the data in the store
247 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
249 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
250 assertEquals("isPresent", true, optional.isPresent());
251 assertEquals("Data node", car, optional.get());
253 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
254 assertEquals("isPresent", true, optional.isPresent());
255 assertEquals("Data node", person, optional.get());
262 public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
263 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
264 DistributedDataStore dataStore = setupDistributedDataStore(
265 "testSingleTransactionsWritesInQuickSuccession", "cars-1");
267 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
269 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
270 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
271 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
272 doCommit(writeTx.ready());
274 writeTx = txChain.newWriteOnlyTransaction();
277 for(int i = 0; i < nCars; i++) {
278 writeTx.write(CarsModel.newCarPath("car" + i),
279 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
282 doCommit(writeTx.ready());
284 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
285 CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
286 assertEquals("isPresent", true, optional.isPresent());
287 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
293 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
294 final boolean writeOnly) throws Exception {
295 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
296 String shardName = "test-1";
298 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
299 // initialized until we create and submit the write the Tx.
300 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
301 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
302 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
304 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
306 // Create the write Tx
308 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
309 dataStore.newReadWriteTransaction();
310 assertNotNull("newReadWriteTransaction returned null", writeTx);
312 // Do some modification operations and ready the Tx on a separate thread.
314 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
315 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
316 TestModel.ID_QNAME, 1).build();
318 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
319 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
320 final CountDownLatch txReady = new CountDownLatch(1);
321 Thread txThread = new Thread() {
325 writeTx.write(TestModel.TEST_PATH,
326 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
328 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
329 TestModel.OUTER_LIST_QNAME).build());
331 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
332 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
334 writeTx.delete(listEntryPath);
336 txCohort.set(writeTx.ready());
337 } catch(Exception e) {
348 // Wait for the Tx operations to complete.
350 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
351 if(caughtEx.get() != null) {
352 throw caughtEx.get();
355 assertEquals("Tx ready", true, done);
357 // At this point the Tx operations should be waiting for the shard to initialize so
358 // trigger the latch to let the shard recovery to continue.
360 blockRecoveryLatch.countDown();
362 // Wait for the Tx commit to complete.
364 doCommit(txCohort.get());
366 // Verify the data in the store
368 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
370 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
371 get(5, TimeUnit.SECONDS);
372 assertEquals("isPresent", true, optional.isPresent());
374 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
375 assertEquals("isPresent", true, optional.isPresent());
377 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
378 assertEquals("isPresent", false, optional.isPresent());
385 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
386 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
387 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
391 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
392 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
396 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
397 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
398 String testName = "testTransactionReadsWithShardNotInitiallyReady";
399 String shardName = "test-1";
401 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
402 // initialized until we create the Tx.
403 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
404 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
405 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
407 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
409 // Create the read-write Tx
411 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
412 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
414 // Do some reads on the Tx on a separate thread.
416 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
417 new AtomicReference<>();
418 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
419 txReadFuture = new AtomicReference<>();
420 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
421 final CountDownLatch txReadsDone = new CountDownLatch(1);
422 Thread txThread = new Thread() {
426 readWriteTx.write(TestModel.TEST_PATH,
427 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
429 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
431 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
432 } catch(Exception e) {
436 txReadsDone.countDown();
443 // Wait for the Tx operations to complete.
445 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
446 if(caughtEx.get() != null) {
447 throw caughtEx.get();
450 assertEquals("Tx reads done", true, done);
452 // At this point the Tx operations should be waiting for the shard to initialize so
453 // trigger the latch to let the shard recovery to continue.
455 blockRecoveryLatch.countDown();
457 // Wait for the reads to complete and verify.
459 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
460 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
468 @Test(expected=NotInitializedException.class)
469 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
470 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
471 String testName = "testTransactionCommitFailureWithShardNotInitialized";
472 String shardName = "test-1";
474 // Set the shard initialization timeout low for the test.
476 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
478 // Setup the InMemoryJournal to block shard recovery indefinitely.
480 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
481 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
482 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
484 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
486 // Create the write Tx
488 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
489 assertNotNull("newReadWriteTransaction returned null", writeTx);
491 // Do some modifications and ready the Tx on a separate thread.
493 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
494 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
495 final CountDownLatch txReady = new CountDownLatch(1);
496 Thread txThread = new Thread() {
500 writeTx.write(TestModel.TEST_PATH,
501 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
503 txCohort.set(writeTx.ready());
504 } catch(Exception e) {
515 // Wait for the Tx operations to complete.
517 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
518 if(caughtEx.get() != null) {
519 throw caughtEx.get();
522 assertEquals("Tx ready", true, done);
524 // Wait for the commit to complete. Since the shard never initialized, the Tx should
525 // have timed out and throw an appropriate exception cause.
528 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
529 } catch(ExecutionException e) {
532 blockRecoveryLatch.countDown();
538 @Test(expected=NotInitializedException.class)
539 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
540 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
541 String testName = "testTransactionReadFailureWithShardNotInitialized";
542 String shardName = "test-1";
544 // Set the shard initialization timeout low for the test.
546 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
548 // Setup the InMemoryJournal to block shard recovery indefinitely.
550 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
551 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
552 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
554 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
556 // Create the read-write Tx
558 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
559 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
561 // Do a read on the Tx on a separate thread.
563 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
564 txReadFuture = new AtomicReference<>();
565 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
566 final CountDownLatch txReadDone = new CountDownLatch(1);
567 Thread txThread = new Thread() {
571 readWriteTx.write(TestModel.TEST_PATH,
572 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
574 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
577 } catch(Exception e) {
581 txReadDone.countDown();
588 // Wait for the Tx operations to complete.
590 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
591 if(caughtEx.get() != null) {
592 throw caughtEx.get();
595 assertEquals("Tx read done", true, done);
597 // Wait for the read to complete. Since the shard never initialized, the Tx should
598 // have timed out and throw an appropriate exception cause.
601 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
602 } catch(ReadFailedException e) {
605 blockRecoveryLatch.countDown();
611 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
612 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
613 String testName = "testTransactionCommitFailureWithNoShardLeader";
614 String shardName = "default";
616 // We don't want the shard to become the leader so prevent shard election from completing
617 // by setting the election timeout, which is based on the heartbeat interval, really high.
619 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
621 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
623 Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
624 new FindLocalShard(shardName, true));
625 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
627 // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
629 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
630 dataStore.onDatastoreContextUpdated(datastoreContextBuilder.build());
632 // Create the write Tx.
634 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
635 dataStore.newReadWriteTransaction();
636 assertNotNull("newReadWriteTransaction returned null", writeTx);
638 // Do some modifications and ready the Tx on a separate thread.
640 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
641 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
642 final CountDownLatch txReady = new CountDownLatch(1);
643 Thread txThread = new Thread() {
647 writeTx.write(TestModel.JUNK_PATH,
648 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
650 txCohort.set(writeTx.ready());
651 } catch(Exception e) {
662 // Wait for the Tx operations to complete.
664 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
665 if(caughtEx.get() != null) {
666 throw caughtEx.get();
669 assertEquals("Tx ready", true, done);
671 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
672 // should have timed out and throw an appropriate exception cause.
675 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
676 } catch(ExecutionException e) {
684 @Test(expected=NoShardLeaderException.class)
685 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
686 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
687 testTransactionCommitFailureWithNoShardLeader(true);
690 @Test(expected=NoShardLeaderException.class)
691 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
692 testTransactionCommitFailureWithNoShardLeader(false);
696 public void testTransactionAbort() throws Exception{
697 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
698 DistributedDataStore dataStore =
699 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
701 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
702 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
704 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
706 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
708 cohort.canCommit().get(5, TimeUnit.SECONDS);
710 cohort.abort().get(5, TimeUnit.SECONDS);
712 testWriteTransaction(dataStore, TestModel.TEST_PATH,
713 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
720 public void testTransactionChainWithSingleShard() throws Exception{
721 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
722 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
724 // 1. Create a Tx chain and write-only Tx
726 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
728 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
729 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
731 // 2. Write some data
733 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
734 writeTx.write(TestModel.TEST_PATH, testNode);
736 // 3. Ready the Tx for commit
738 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
740 // 4. Commit the Tx on another thread that first waits for the second read Tx.
742 final CountDownLatch continueCommit1 = new CountDownLatch(1);
743 final CountDownLatch commit1Done = new CountDownLatch(1);
744 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
749 continueCommit1.await();
751 } catch (Exception e) {
754 commit1Done.countDown();
759 // 5. Create a new read Tx from the chain to read and verify the data from the first
760 // Tx is visible after being readied.
762 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
763 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
764 assertEquals("isPresent", true, optional.isPresent());
765 assertEquals("Data node", testNode, optional.get());
767 // 6. Create a new RW Tx from the chain, write more data, and ready it
769 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
770 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
771 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
773 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
775 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
776 // verify it is visible.
778 readTx = txChain.newReadWriteTransaction();
779 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
780 assertEquals("isPresent", true, optional.isPresent());
781 assertEquals("Data node", outerNode, optional.get());
783 // 8. Wait for the 2 commits to complete and close the chain.
785 continueCommit1.countDown();
786 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
788 if(commit1Error.get() != null) {
789 throw commit1Error.get();
796 // 9. Create a new read Tx from the data store and verify committed data.
798 readTx = dataStore.newReadOnlyTransaction();
799 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
800 assertEquals("isPresent", true, optional.isPresent());
801 assertEquals("Data node", outerNode, optional.get());
808 public void testTransactionChainWithMultipleShards() throws Exception{
809 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
810 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
811 "cars-1", "people-1");
813 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
815 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
816 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
818 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
819 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
821 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
822 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
824 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
826 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
828 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
829 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
830 readWriteTx.write(carPath, car);
832 MapEntryNode person = PeopleModel.newPersonEntry("jack");
833 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
834 readWriteTx.merge(personPath, person);
836 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
837 assertEquals("isPresent", true, optional.isPresent());
838 assertEquals("Data node", car, optional.get());
840 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
841 assertEquals("isPresent", true, optional.isPresent());
842 assertEquals("Data node", person, optional.get());
844 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
846 writeTx = txChain.newWriteOnlyTransaction();
848 writeTx.delete(carPath);
850 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
852 ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
853 ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
855 doCommit(canCommit1, cohort1);
856 doCommit(canCommit2, cohort2);
861 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
863 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
864 assertEquals("isPresent", false, optional.isPresent());
866 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
867 assertEquals("isPresent", true, optional.isPresent());
868 assertEquals("Data node", person, optional.get());
875 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
876 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
877 DistributedDataStore dataStore = setupDistributedDataStore(
878 "testCreateChainedTransactionsInQuickSuccession", "cars-1");
880 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
881 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
882 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
884 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
885 DOMTransactionChain txChain = broker.createTransactionChain(listener);
887 List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
889 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
890 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
891 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
892 futures.add(writeTx.submit());
895 for(int i = 0; i < nCars; i++) {
896 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
898 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
899 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
901 futures.add(rwTx.submit());
904 for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
908 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
909 LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
910 assertEquals("isPresent", true, optional.isPresent());
911 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
922 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
923 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
924 DistributedDataStore dataStore = setupDistributedDataStore(
925 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
927 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
929 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
933 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
935 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
936 assertEquals("isPresent", false, optional.isPresent());
945 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
946 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
947 DistributedDataStore dataStore = setupDistributedDataStore(
948 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
950 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
952 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
953 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
955 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
957 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
960 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
965 public void testCreateChainedTransactionAfterClose() throws Throwable {
966 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
967 DistributedDataStore dataStore = setupDistributedDataStore(
968 "testCreateChainedTransactionAfterClose", "test-1");
970 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
974 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
976 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
981 public void testChainedTransactionFailureWithSingleShard() throws Exception{
982 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
983 DistributedDataStore dataStore = setupDistributedDataStore(
984 "testChainedTransactionFailureWithSingleShard", "cars-1");
986 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
987 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
988 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
990 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
991 DOMTransactionChain txChain = broker.createTransactionChain(listener);
993 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
995 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
996 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
997 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
999 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1002 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1003 fail("Expected TransactionCommitFailedException");
1004 } catch (TransactionCommitFailedException e) {
1008 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1017 public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1018 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1019 DistributedDataStore dataStore = setupDistributedDataStore(
1020 "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1022 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1023 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1024 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1026 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1027 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1029 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1031 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1033 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1034 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1035 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1037 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1038 // done for put for performance reasons.
1039 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1042 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1043 fail("Expected TransactionCommitFailedException");
1044 } catch (TransactionCommitFailedException e) {
1048 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1057 public void testChangeListenerRegistration() throws Exception{
1058 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1059 DistributedDataStore dataStore =
1060 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1062 testWriteTransaction(dataStore, TestModel.TEST_PATH,
1063 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1065 MockDataChangeListener listener = new MockDataChangeListener(1);
1067 ListenerRegistration<MockDataChangeListener>
1068 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1069 DataChangeScope.SUBTREE);
1071 assertNotNull("registerChangeListener returned null", listenerReg);
1073 // Wait for the initial notification
1075 listener.waitForChangeEvents(TestModel.TEST_PATH);
1081 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1082 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1084 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1085 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1086 testWriteTransaction(dataStore, listPath,
1087 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1089 // Wait for the 2 updates.
1091 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1093 listenerReg.close();
1095 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1096 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1097 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1099 listener.expectNoMoreChanges("Received unexpected change after close");