2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.io.IOException;
32 import java.math.BigInteger;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.AfterClass;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.mockito.Mockito;
44 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
46 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
48 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
49 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
50 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
51 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
53 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
54 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
57 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
58 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
61 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
63 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
64 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
65 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
67 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
68 import org.opendaylight.yangtools.concepts.ListenerRegistration;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
70 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
71 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
74 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
75 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
77 public class DistributedDataStoreIntegrationTest {
79 private static ActorSystem system;
81 private final DatastoreContext.Builder datastoreContextBuilder =
82 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
85 public static void setUpClass() throws IOException {
86 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
87 Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
88 Cluster.get(system).join(member1Address);
92 public static void tearDownClass() throws IOException {
93 JavaTestKit.shutdownActorSystem(system);
97 protected ActorSystem getSystem() {
102 public void testWriteTransactionWithSingleShard() throws Exception{
103 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
104 DistributedDataStore dataStore =
105 setupDistributedDataStore("transactionIntegrationTest", "test-1");
107 testWriteTransaction(dataStore, TestModel.TEST_PATH,
108 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
110 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
111 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
118 public void testWriteTransactionWithMultipleShards() throws Exception{
119 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
120 DistributedDataStore dataStore =
121 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
123 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
124 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
126 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
127 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
129 doCommit(writeTx.ready());
131 writeTx = dataStore.newWriteOnlyTransaction();
133 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
134 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
136 doCommit(writeTx.ready());
138 writeTx = dataStore.newWriteOnlyTransaction();
140 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
141 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
142 writeTx.write(carPath, car);
144 MapEntryNode person = PeopleModel.newPersonEntry("jack");
145 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
146 writeTx.write(personPath, person);
148 doCommit(writeTx.ready());
150 // Verify the data in the store
152 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
154 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
155 assertEquals("isPresent", true, optional.isPresent());
156 assertEquals("Data node", car, optional.get());
158 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
159 assertEquals("isPresent", true, optional.isPresent());
160 assertEquals("Data node", person, optional.get());
167 public void testReadWriteTransactionWithSingleShard() throws Exception{
168 System.setProperty("shard.persistent", "true");
169 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
170 DistributedDataStore dataStore =
171 setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
173 // 1. Create a read-write Tx
175 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
176 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
178 // 2. Write some data
180 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
181 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
182 readWriteTx.write(nodePath, nodeToWrite );
184 // 3. Read the data from Tx
186 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
187 assertEquals("exists", true, exists);
189 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
190 assertEquals("isPresent", true, optional.isPresent());
191 assertEquals("Data node", nodeToWrite, optional.get());
193 // 4. Ready the Tx for commit
195 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
201 // 6. Verify the data in the store
203 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
205 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
206 assertEquals("isPresent", true, optional.isPresent());
207 assertEquals("Data node", nodeToWrite, optional.get());
214 public void testReadWriteTransactionWithMultipleShards() throws Exception{
215 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
216 DistributedDataStore dataStore =
217 setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
219 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
220 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
222 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
223 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
225 doCommit(readWriteTx.ready());
227 readWriteTx = dataStore.newReadWriteTransaction();
229 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
230 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
232 doCommit(readWriteTx.ready());
234 readWriteTx = dataStore.newReadWriteTransaction();
236 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
237 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
238 readWriteTx.write(carPath, car);
240 MapEntryNode person = PeopleModel.newPersonEntry("jack");
241 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
242 readWriteTx.write(personPath, person);
244 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
245 assertEquals("exists", true, exists);
247 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
248 assertEquals("isPresent", true, optional.isPresent());
249 assertEquals("Data node", car, optional.get());
251 doCommit(readWriteTx.ready());
253 // Verify the data in the store
255 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
257 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
258 assertEquals("isPresent", true, optional.isPresent());
259 assertEquals("Data node", car, optional.get());
261 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
262 assertEquals("isPresent", true, optional.isPresent());
263 assertEquals("Data node", person, optional.get());
270 public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
271 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
272 DistributedDataStore dataStore = setupDistributedDataStore(
273 "testSingleTransactionsWritesInQuickSuccession", "cars-1");
275 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
277 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
278 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
279 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
280 doCommit(writeTx.ready());
282 writeTx = txChain.newWriteOnlyTransaction();
285 for(int i = 0; i < nCars; i++) {
286 writeTx.write(CarsModel.newCarPath("car" + i),
287 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
290 doCommit(writeTx.ready());
292 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
293 CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
294 assertEquals("isPresent", true, optional.isPresent());
295 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
301 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
302 final boolean writeOnly) throws Exception {
303 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
304 String shardName = "test-1";
306 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
307 // initialized until we create and submit the write the Tx.
308 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
309 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
310 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
312 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
314 // Create the write Tx
316 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
317 dataStore.newReadWriteTransaction();
318 assertNotNull("newReadWriteTransaction returned null", writeTx);
320 // Do some modification operations and ready the Tx on a separate thread.
322 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
323 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
324 TestModel.ID_QNAME, 1).build();
326 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
327 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
328 final CountDownLatch txReady = new CountDownLatch(1);
329 Thread txThread = new Thread() {
333 writeTx.write(TestModel.TEST_PATH,
334 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
336 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
337 TestModel.OUTER_LIST_QNAME).build());
339 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
340 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
342 writeTx.delete(listEntryPath);
344 txCohort.set(writeTx.ready());
345 } catch(Exception e) {
356 // Wait for the Tx operations to complete.
358 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
359 if(caughtEx.get() != null) {
360 throw caughtEx.get();
363 assertEquals("Tx ready", true, done);
365 // At this point the Tx operations should be waiting for the shard to initialize so
366 // trigger the latch to let the shard recovery to continue.
368 blockRecoveryLatch.countDown();
370 // Wait for the Tx commit to complete.
372 doCommit(txCohort.get());
374 // Verify the data in the store
376 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
378 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
379 get(5, TimeUnit.SECONDS);
380 assertEquals("isPresent", true, optional.isPresent());
382 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
383 assertEquals("isPresent", true, optional.isPresent());
385 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
386 assertEquals("isPresent", false, optional.isPresent());
393 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
394 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
395 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
399 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
400 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
404 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
405 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
406 String testName = "testTransactionReadsWithShardNotInitiallyReady";
407 String shardName = "test-1";
409 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
410 // initialized until we create the Tx.
411 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
412 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
413 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
415 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
417 // Create the read-write Tx
419 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
420 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
422 // Do some reads on the Tx on a separate thread.
424 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
425 new AtomicReference<>();
426 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
427 txReadFuture = new AtomicReference<>();
428 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
429 final CountDownLatch txReadsDone = new CountDownLatch(1);
430 Thread txThread = new Thread() {
434 readWriteTx.write(TestModel.TEST_PATH,
435 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
437 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
439 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
440 } catch(Exception e) {
444 txReadsDone.countDown();
451 // Wait for the Tx operations to complete.
453 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
454 if(caughtEx.get() != null) {
455 throw caughtEx.get();
458 assertEquals("Tx reads done", true, done);
460 // At this point the Tx operations should be waiting for the shard to initialize so
461 // trigger the latch to let the shard recovery to continue.
463 blockRecoveryLatch.countDown();
465 // Wait for the reads to complete and verify.
467 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
468 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
476 @Test(expected=NotInitializedException.class)
477 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
478 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
479 String testName = "testTransactionCommitFailureWithShardNotInitialized";
480 String shardName = "test-1";
482 // Set the shard initialization timeout low for the test.
484 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
486 // Setup the InMemoryJournal to block shard recovery indefinitely.
488 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
489 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
490 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
492 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
494 // Create the write Tx
496 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
497 assertNotNull("newReadWriteTransaction returned null", writeTx);
499 // Do some modifications and ready the Tx on a separate thread.
501 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
502 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
503 final CountDownLatch txReady = new CountDownLatch(1);
504 Thread txThread = new Thread() {
508 writeTx.write(TestModel.TEST_PATH,
509 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
511 txCohort.set(writeTx.ready());
512 } catch(Exception e) {
523 // Wait for the Tx operations to complete.
525 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
526 if(caughtEx.get() != null) {
527 throw caughtEx.get();
530 assertEquals("Tx ready", true, done);
532 // Wait for the commit to complete. Since the shard never initialized, the Tx should
533 // have timed out and throw an appropriate exception cause.
536 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
537 } catch(ExecutionException e) {
540 blockRecoveryLatch.countDown();
546 @Test(expected=NotInitializedException.class)
547 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
548 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
549 String testName = "testTransactionReadFailureWithShardNotInitialized";
550 String shardName = "test-1";
552 // Set the shard initialization timeout low for the test.
554 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
556 // Setup the InMemoryJournal to block shard recovery indefinitely.
558 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
559 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
560 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
562 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
564 // Create the read-write Tx
566 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
567 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
569 // Do a read on the Tx on a separate thread.
571 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
572 txReadFuture = new AtomicReference<>();
573 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
574 final CountDownLatch txReadDone = new CountDownLatch(1);
575 Thread txThread = new Thread() {
579 readWriteTx.write(TestModel.TEST_PATH,
580 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
582 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
585 } catch(Exception e) {
589 txReadDone.countDown();
596 // Wait for the Tx operations to complete.
598 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
599 if(caughtEx.get() != null) {
600 throw caughtEx.get();
603 assertEquals("Tx read done", true, done);
605 // Wait for the read to complete. Since the shard never initialized, the Tx should
606 // have timed out and throw an appropriate exception cause.
609 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
610 } catch(ReadFailedException e) {
613 blockRecoveryLatch.countDown();
619 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
620 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
621 String testName = "testTransactionCommitFailureWithNoShardLeader";
622 String shardName = "default";
624 // We don't want the shard to become the leader so prevent shard election from completing
625 // by setting the election timeout, which is based on the heartbeat interval, really high.
627 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
629 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
631 Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
632 new FindLocalShard(shardName, true));
633 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
635 // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
637 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
638 dataStore.onDatastoreContextUpdated(datastoreContextBuilder.build());
640 // Create the write Tx.
642 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
643 dataStore.newReadWriteTransaction();
644 assertNotNull("newReadWriteTransaction returned null", writeTx);
646 // Do some modifications and ready the Tx on a separate thread.
648 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
649 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
650 final CountDownLatch txReady = new CountDownLatch(1);
651 Thread txThread = new Thread() {
655 writeTx.write(TestModel.JUNK_PATH,
656 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
658 txCohort.set(writeTx.ready());
659 } catch(Exception e) {
670 // Wait for the Tx operations to complete.
672 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
673 if(caughtEx.get() != null) {
674 throw caughtEx.get();
677 assertEquals("Tx ready", true, done);
679 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
680 // should have timed out and throw an appropriate exception cause.
683 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
684 } catch(ExecutionException e) {
692 @Test(expected=NoShardLeaderException.class)
693 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
694 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
695 testTransactionCommitFailureWithNoShardLeader(true);
698 @Test(expected=NoShardLeaderException.class)
699 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
700 testTransactionCommitFailureWithNoShardLeader(false);
704 public void testTransactionAbort() throws Exception{
705 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
706 DistributedDataStore dataStore =
707 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
709 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
710 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
712 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
714 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
716 cohort.canCommit().get(5, TimeUnit.SECONDS);
718 cohort.abort().get(5, TimeUnit.SECONDS);
720 testWriteTransaction(dataStore, TestModel.TEST_PATH,
721 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
728 public void testTransactionChainWithSingleShard() throws Exception{
729 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
730 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
732 // 1. Create a Tx chain and write-only Tx
734 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
736 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
737 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
739 // 2. Write some data
741 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
742 writeTx.write(TestModel.TEST_PATH, testNode);
744 // 3. Ready the Tx for commit
746 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
748 // 4. Commit the Tx on another thread that first waits for the second read Tx.
750 final CountDownLatch continueCommit1 = new CountDownLatch(1);
751 final CountDownLatch commit1Done = new CountDownLatch(1);
752 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
757 continueCommit1.await();
759 } catch (Exception e) {
762 commit1Done.countDown();
767 // 5. Create a new read Tx from the chain to read and verify the data from the first
768 // Tx is visible after being readied.
770 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
771 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
772 assertEquals("isPresent", true, optional.isPresent());
773 assertEquals("Data node", testNode, optional.get());
775 // 6. Create a new RW Tx from the chain, write more data, and ready it
777 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
778 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
779 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
781 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
783 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
784 // verify it is visible.
786 readTx = txChain.newReadWriteTransaction();
787 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
788 assertEquals("isPresent", true, optional.isPresent());
789 assertEquals("Data node", outerNode, optional.get());
791 // 8. Wait for the 2 commits to complete and close the chain.
793 continueCommit1.countDown();
794 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
796 if(commit1Error.get() != null) {
797 throw commit1Error.get();
804 // 9. Create a new read Tx from the data store and verify committed data.
806 readTx = dataStore.newReadOnlyTransaction();
807 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
808 assertEquals("isPresent", true, optional.isPresent());
809 assertEquals("Data node", outerNode, optional.get());
816 public void testTransactionChainWithMultipleShards() throws Exception{
817 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
818 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
819 "cars-1", "people-1");
821 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
823 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
824 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
826 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
827 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
829 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
830 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
832 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
834 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
836 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
837 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
838 readWriteTx.write(carPath, car);
840 MapEntryNode person = PeopleModel.newPersonEntry("jack");
841 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
842 readWriteTx.merge(personPath, person);
844 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
845 assertEquals("isPresent", true, optional.isPresent());
846 assertEquals("Data node", car, optional.get());
848 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
849 assertEquals("isPresent", true, optional.isPresent());
850 assertEquals("Data node", person, optional.get());
852 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
854 writeTx = txChain.newWriteOnlyTransaction();
856 writeTx.delete(carPath);
858 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
860 ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
861 ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
863 doCommit(canCommit1, cohort1);
864 doCommit(canCommit2, cohort2);
869 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
871 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
872 assertEquals("isPresent", false, optional.isPresent());
874 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
875 assertEquals("isPresent", true, optional.isPresent());
876 assertEquals("Data node", person, optional.get());
883 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
884 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
885 DistributedDataStore dataStore = setupDistributedDataStore(
886 "testCreateChainedTransactionsInQuickSuccession", "cars-1");
888 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
889 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
890 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
892 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
893 DOMTransactionChain txChain = broker.createTransactionChain(listener);
895 List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
897 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
898 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
899 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
900 futures.add(writeTx.submit());
903 for(int i = 0; i < nCars; i++) {
904 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
906 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
907 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
909 futures.add(rwTx.submit());
912 for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
916 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
917 LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
918 assertEquals("isPresent", true, optional.isPresent());
919 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
930 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
931 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
932 DistributedDataStore dataStore = setupDistributedDataStore(
933 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
935 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
937 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
941 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
943 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
944 assertEquals("isPresent", false, optional.isPresent());
953 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
954 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
955 DistributedDataStore dataStore = setupDistributedDataStore(
956 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
958 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
960 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
961 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
963 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
965 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
968 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
973 public void testCreateChainedTransactionAfterClose() throws Throwable {
974 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
975 DistributedDataStore dataStore = setupDistributedDataStore(
976 "testCreateChainedTransactionAfterClose", "test-1");
978 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
982 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
984 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
989 public void testChainedTransactionFailureWithSingleShard() throws Exception{
990 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
991 DistributedDataStore dataStore = setupDistributedDataStore(
992 "testChainedTransactionFailureWithSingleShard", "cars-1");
994 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
995 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
996 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
998 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
999 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1001 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1003 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1004 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1005 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1007 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1010 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1011 fail("Expected TransactionCommitFailedException");
1012 } catch (TransactionCommitFailedException e) {
1016 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1025 public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1026 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1027 DistributedDataStore dataStore = setupDistributedDataStore(
1028 "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1030 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1031 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1032 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1034 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1035 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1037 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1039 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1041 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1042 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1043 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1045 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1046 // done for put for performance reasons.
1047 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1050 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1051 fail("Expected TransactionCommitFailedException");
1052 } catch (TransactionCommitFailedException e) {
1056 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1065 public void testChangeListenerRegistration() throws Exception{
1066 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1067 DistributedDataStore dataStore =
1068 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1070 testWriteTransaction(dataStore, TestModel.TEST_PATH,
1071 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1073 MockDataChangeListener listener = new MockDataChangeListener(1);
1075 ListenerRegistration<MockDataChangeListener>
1076 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1077 DataChangeScope.SUBTREE);
1079 assertNotNull("registerChangeListener returned null", listenerReg);
1081 // Wait for the initial notification
1083 listener.waitForChangeEvents(TestModel.TEST_PATH);
1089 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1090 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1092 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1093 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1094 testWriteTransaction(dataStore, listPath,
1095 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1097 // Wait for the 2 updates.
1099 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1101 listenerReg.close();
1103 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1104 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1105 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1107 listener.expectNoMoreChanges("Received unexpected change after close");