2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.io.IOException;
32 import java.math.BigInteger;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.AfterClass;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.mockito.Mockito;
44 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
46 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
48 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
49 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
50 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
51 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
53 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
54 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
57 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
58 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
61 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
63 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
64 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
65 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
67 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
68 import org.opendaylight.yangtools.concepts.ListenerRegistration;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
70 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
71 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
74 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
75 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
77 public class DistributedDataStoreIntegrationTest {
79 private static ActorSystem system;
81 private final DatastoreContext.Builder datastoreContextBuilder =
82 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
85 public static void setUpClass() throws IOException {
86 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
88 Cluster.get(system).join(member1Address);
92 public static void tearDownClass() throws IOException {
93 JavaTestKit.shutdownActorSystem(system);
97 protected ActorSystem getSystem() {
102 public void testWriteTransactionWithSingleShard() throws Exception{
103 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
104 DistributedDataStore dataStore =
105 setupDistributedDataStore("transactionIntegrationTest", "test-1");
107 testWriteTransaction(dataStore, TestModel.TEST_PATH,
108 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
110 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
111 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
118 public void testWriteTransactionWithMultipleShards() throws Exception{
119 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
120 DistributedDataStore dataStore =
121 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
123 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
124 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
126 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
127 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
129 doCommit(writeTx.ready());
131 writeTx = dataStore.newWriteOnlyTransaction();
133 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
134 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
136 doCommit(writeTx.ready());
138 writeTx = dataStore.newWriteOnlyTransaction();
140 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
141 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
142 writeTx.write(carPath, car);
144 MapEntryNode person = PeopleModel.newPersonEntry("jack");
145 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
146 writeTx.write(personPath, person);
148 doCommit(writeTx.ready());
150 // Verify the data in the store
152 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
154 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
155 assertEquals("isPresent", true, optional.isPresent());
156 assertEquals("Data node", car, optional.get());
158 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
159 assertEquals("isPresent", true, optional.isPresent());
160 assertEquals("Data node", person, optional.get());
167 public void testReadWriteTransactionWithSingleShard() throws Exception{
168 System.setProperty("shard.persistent", "true");
169 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
170 DistributedDataStore dataStore =
171 setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
173 // 1. Create a read-write Tx
175 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
176 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
178 // 2. Write some data
180 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
181 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
182 readWriteTx.write(nodePath, nodeToWrite );
184 // 3. Read the data from Tx
186 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
187 assertEquals("exists", true, exists);
189 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
190 assertEquals("isPresent", true, optional.isPresent());
191 assertEquals("Data node", nodeToWrite, optional.get());
193 // 4. Ready the Tx for commit
195 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
201 // 6. Verify the data in the store
203 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
205 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
206 assertEquals("isPresent", true, optional.isPresent());
207 assertEquals("Data node", nodeToWrite, optional.get());
214 public void testReadWriteTransactionWithMultipleShards() throws Exception{
215 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
216 DistributedDataStore dataStore =
217 setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
219 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
220 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
222 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
223 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
225 doCommit(readWriteTx.ready());
227 readWriteTx = dataStore.newReadWriteTransaction();
229 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
230 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
232 doCommit(readWriteTx.ready());
234 readWriteTx = dataStore.newReadWriteTransaction();
236 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
237 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
238 readWriteTx.write(carPath, car);
240 MapEntryNode person = PeopleModel.newPersonEntry("jack");
241 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
242 readWriteTx.write(personPath, person);
244 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
245 assertEquals("exists", true, exists);
247 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
248 assertEquals("isPresent", true, optional.isPresent());
249 assertEquals("Data node", car, optional.get());
251 doCommit(readWriteTx.ready());
253 // Verify the data in the store
255 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
257 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
258 assertEquals("isPresent", true, optional.isPresent());
259 assertEquals("Data node", car, optional.get());
261 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
262 assertEquals("isPresent", true, optional.isPresent());
263 assertEquals("Data node", person, optional.get());
270 public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
271 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
272 DistributedDataStore dataStore = setupDistributedDataStore(
273 "testSingleTransactionsWritesInQuickSuccession", "cars-1");
275 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
277 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
278 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
279 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
280 doCommit(writeTx.ready());
282 writeTx = txChain.newWriteOnlyTransaction();
285 for(int i = 0; i < nCars; i++) {
286 writeTx.write(CarsModel.newCarPath("car" + i),
287 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
290 doCommit(writeTx.ready());
292 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
293 CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
294 assertEquals("isPresent", true, optional.isPresent());
295 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
301 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
302 final boolean writeOnly) throws Exception {
303 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
304 String shardName = "test-1";
306 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
307 // initialized until we create and submit the write the Tx.
308 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
309 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
310 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
312 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
314 // Create the write Tx
316 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
317 dataStore.newReadWriteTransaction();
318 assertNotNull("newReadWriteTransaction returned null", writeTx);
320 // Do some modification operations and ready the Tx on a separate thread.
322 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
323 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
324 TestModel.ID_QNAME, 1).build();
326 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
327 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
328 final CountDownLatch txReady = new CountDownLatch(1);
329 Thread txThread = new Thread() {
333 writeTx.write(TestModel.TEST_PATH,
334 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
336 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
337 TestModel.OUTER_LIST_QNAME).build());
339 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
340 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
342 writeTx.delete(listEntryPath);
344 txCohort.set(writeTx.ready());
345 } catch(Exception e) {
356 // Wait for the Tx operations to complete.
358 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
359 if(caughtEx.get() != null) {
360 throw caughtEx.get();
363 assertEquals("Tx ready", true, done);
365 // At this point the Tx operations should be waiting for the shard to initialize so
366 // trigger the latch to let the shard recovery to continue.
368 blockRecoveryLatch.countDown();
370 // Wait for the Tx commit to complete.
372 doCommit(txCohort.get());
374 // Verify the data in the store
376 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
378 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
379 get(5, TimeUnit.SECONDS);
380 assertEquals("isPresent", true, optional.isPresent());
382 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
383 assertEquals("isPresent", true, optional.isPresent());
385 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
386 assertEquals("isPresent", false, optional.isPresent());
393 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
394 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
395 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
399 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
400 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
404 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
405 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
406 String testName = "testTransactionReadsWithShardNotInitiallyReady";
407 String shardName = "test-1";
409 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
410 // initialized until we create the Tx.
411 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
412 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
413 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
415 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
417 // Create the read-write Tx
419 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
420 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
422 // Do some reads on the Tx on a separate thread.
424 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
425 new AtomicReference<>();
426 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
427 txReadFuture = new AtomicReference<>();
428 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
429 final CountDownLatch txReadsDone = new CountDownLatch(1);
430 Thread txThread = new Thread() {
434 readWriteTx.write(TestModel.TEST_PATH,
435 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
437 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
439 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
440 } catch(Exception e) {
444 txReadsDone.countDown();
451 // Wait for the Tx operations to complete.
453 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
454 if(caughtEx.get() != null) {
455 throw caughtEx.get();
458 assertEquals("Tx reads done", true, done);
460 // At this point the Tx operations should be waiting for the shard to initialize so
461 // trigger the latch to let the shard recovery to continue.
463 blockRecoveryLatch.countDown();
465 // Wait for the reads to complete and verify.
467 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
468 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
476 @Test(expected=NotInitializedException.class)
477 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
478 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
479 String testName = "testTransactionCommitFailureWithShardNotInitialized";
480 String shardName = "test-1";
482 // Set the shard initialization timeout low for the test.
484 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
486 // Setup the InMemoryJournal to block shard recovery indefinitely.
488 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
489 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
490 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
492 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
494 // Create the write Tx
496 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
497 assertNotNull("newReadWriteTransaction returned null", writeTx);
499 // Do some modifications and ready the Tx on a separate thread.
501 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
502 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
503 final CountDownLatch txReady = new CountDownLatch(1);
504 Thread txThread = new Thread() {
508 writeTx.write(TestModel.TEST_PATH,
509 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
511 txCohort.set(writeTx.ready());
512 } catch(Exception e) {
523 // Wait for the Tx operations to complete.
525 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
526 if(caughtEx.get() != null) {
527 throw caughtEx.get();
530 assertEquals("Tx ready", true, done);
532 // Wait for the commit to complete. Since the shard never initialized, the Tx should
533 // have timed out and throw an appropriate exception cause.
536 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
537 } catch(ExecutionException e) {
540 blockRecoveryLatch.countDown();
546 @Test(expected=NotInitializedException.class)
547 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
548 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
549 String testName = "testTransactionReadFailureWithShardNotInitialized";
550 String shardName = "test-1";
552 // Set the shard initialization timeout low for the test.
554 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
556 // Setup the InMemoryJournal to block shard recovery indefinitely.
558 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
559 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
560 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
562 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
564 // Create the read-write Tx
566 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
567 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
569 // Do a read on the Tx on a separate thread.
571 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
572 txReadFuture = new AtomicReference<>();
573 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
574 final CountDownLatch txReadDone = new CountDownLatch(1);
575 Thread txThread = new Thread() {
579 readWriteTx.write(TestModel.TEST_PATH,
580 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
582 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
585 } catch(Exception e) {
589 txReadDone.countDown();
596 // Wait for the Tx operations to complete.
598 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
599 if(caughtEx.get() != null) {
600 throw caughtEx.get();
603 assertEquals("Tx read done", true, done);
605 // Wait for the read to complete. Since the shard never initialized, the Tx should
606 // have timed out and throw an appropriate exception cause.
609 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
610 } catch(ReadFailedException e) {
613 blockRecoveryLatch.countDown();
619 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) throws Throwable {
620 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
621 String shardName = "default";
623 // We don't want the shard to become the leader so prevent shard elections.
624 datastoreContextBuilder.customRaftPolicyImplementation(
625 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
627 // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
628 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1).
629 shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
631 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
633 Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
634 new FindLocalShard(shardName, true));
635 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
637 // Create the write Tx.
639 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
640 dataStore.newReadWriteTransaction();
641 assertNotNull("newReadWriteTransaction returned null", writeTx);
643 // Do some modifications and ready the Tx on a separate thread.
645 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
646 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
647 final CountDownLatch txReady = new CountDownLatch(1);
648 Thread txThread = new Thread() {
652 writeTx.write(TestModel.JUNK_PATH,
653 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
655 txCohort.set(writeTx.ready());
656 } catch(Exception e) {
667 // Wait for the Tx operations to complete.
669 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
670 if(caughtEx.get() != null) {
671 throw caughtEx.get();
674 assertEquals("Tx ready", true, done);
676 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
677 // should have timed out and throw an appropriate exception cause.
680 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
681 } catch(ExecutionException e) {
689 @Test(expected=NoShardLeaderException.class)
690 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
691 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
692 testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
695 @Test(expected=NoShardLeaderException.class)
696 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
697 testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
701 public void testTransactionAbort() throws Exception{
702 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
703 DistributedDataStore dataStore =
704 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
706 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
707 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
709 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
711 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
713 cohort.canCommit().get(5, TimeUnit.SECONDS);
715 cohort.abort().get(5, TimeUnit.SECONDS);
717 testWriteTransaction(dataStore, TestModel.TEST_PATH,
718 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
725 public void testTransactionChainWithSingleShard() throws Exception{
726 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
727 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
729 // 1. Create a Tx chain and write-only Tx
731 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
733 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
734 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
736 // 2. Write some data
738 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
739 writeTx.write(TestModel.TEST_PATH, testNode);
741 // 3. Ready the Tx for commit
743 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
745 // 4. Commit the Tx on another thread that first waits for the second read Tx.
747 final CountDownLatch continueCommit1 = new CountDownLatch(1);
748 final CountDownLatch commit1Done = new CountDownLatch(1);
749 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
754 continueCommit1.await();
756 } catch (Exception e) {
759 commit1Done.countDown();
764 // 5. Create a new read Tx from the chain to read and verify the data from the first
765 // Tx is visible after being readied.
767 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
768 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
769 assertEquals("isPresent", true, optional.isPresent());
770 assertEquals("Data node", testNode, optional.get());
772 // 6. Create a new RW Tx from the chain, write more data, and ready it
774 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
775 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
776 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
778 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
780 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
781 // verify it is visible.
783 readTx = txChain.newReadWriteTransaction();
784 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
785 assertEquals("isPresent", true, optional.isPresent());
786 assertEquals("Data node", outerNode, optional.get());
788 // 8. Wait for the 2 commits to complete and close the chain.
790 continueCommit1.countDown();
791 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
793 if(commit1Error.get() != null) {
794 throw commit1Error.get();
801 // 9. Create a new read Tx from the data store and verify committed data.
803 readTx = dataStore.newReadOnlyTransaction();
804 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
805 assertEquals("isPresent", true, optional.isPresent());
806 assertEquals("Data node", outerNode, optional.get());
813 public void testTransactionChainWithMultipleShards() throws Exception{
814 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
815 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
816 "cars-1", "people-1");
818 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
820 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
821 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
823 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
824 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
826 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
827 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
829 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
831 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
833 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
834 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
835 readWriteTx.write(carPath, car);
837 MapEntryNode person = PeopleModel.newPersonEntry("jack");
838 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
839 readWriteTx.merge(personPath, person);
841 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
842 assertEquals("isPresent", true, optional.isPresent());
843 assertEquals("Data node", car, optional.get());
845 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
846 assertEquals("isPresent", true, optional.isPresent());
847 assertEquals("Data node", person, optional.get());
849 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
851 writeTx = txChain.newWriteOnlyTransaction();
853 writeTx.delete(carPath);
855 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
857 ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
858 ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
860 doCommit(canCommit1, cohort1);
861 doCommit(canCommit2, cohort2);
866 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
868 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
869 assertEquals("isPresent", false, optional.isPresent());
871 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
872 assertEquals("isPresent", true, optional.isPresent());
873 assertEquals("Data node", person, optional.get());
880 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
881 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
882 DistributedDataStore dataStore = setupDistributedDataStore(
883 "testCreateChainedTransactionsInQuickSuccession", "cars-1");
885 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
886 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
887 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
889 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
890 DOMTransactionChain txChain = broker.createTransactionChain(listener);
892 List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
894 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
895 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
896 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
897 futures.add(writeTx.submit());
900 for(int i = 0; i < nCars; i++) {
901 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
903 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
904 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
906 futures.add(rwTx.submit());
909 for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
913 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
914 LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
915 assertEquals("isPresent", true, optional.isPresent());
916 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
927 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
928 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
929 DistributedDataStore dataStore = setupDistributedDataStore(
930 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
932 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
934 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
938 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
940 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
941 assertEquals("isPresent", false, optional.isPresent());
950 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
951 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
952 DistributedDataStore dataStore = setupDistributedDataStore(
953 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
955 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
957 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
958 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
960 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
962 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
965 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
970 public void testCreateChainedTransactionAfterClose() throws Throwable {
971 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
972 DistributedDataStore dataStore = setupDistributedDataStore(
973 "testCreateChainedTransactionAfterClose", "test-1");
975 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
979 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
981 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
986 public void testChainedTransactionFailureWithSingleShard() throws Exception{
987 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
988 DistributedDataStore dataStore = setupDistributedDataStore(
989 "testChainedTransactionFailureWithSingleShard", "cars-1");
991 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
992 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
993 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
995 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
996 DOMTransactionChain txChain = broker.createTransactionChain(listener);
998 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1000 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1001 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1002 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1004 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1007 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1008 fail("Expected TransactionCommitFailedException");
1009 } catch (TransactionCommitFailedException e) {
1013 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1022 public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1023 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1024 DistributedDataStore dataStore = setupDistributedDataStore(
1025 "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1027 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1028 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1029 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1031 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1032 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1034 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1036 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1038 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1039 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1040 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1042 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1043 // done for put for performance reasons.
1044 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1047 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1048 fail("Expected TransactionCommitFailedException");
1049 } catch (TransactionCommitFailedException e) {
1053 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1062 public void testChangeListenerRegistration() throws Exception{
1063 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1064 DistributedDataStore dataStore =
1065 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1067 testWriteTransaction(dataStore, TestModel.TEST_PATH,
1068 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1070 MockDataChangeListener listener = new MockDataChangeListener(1);
1072 ListenerRegistration<MockDataChangeListener>
1073 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1074 DataChangeScope.SUBTREE);
1076 assertNotNull("registerChangeListener returned null", listenerReg);
1078 // Wait for the initial notification
1080 listener.waitForChangeEvents(TestModel.TEST_PATH);
1086 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1087 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1089 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1090 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1091 testWriteTransaction(dataStore, listPath,
1092 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1094 // Wait for the 2 updates.
1096 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1098 listenerReg.close();
1100 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1101 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1102 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1104 listener.expectNoMoreChanges("Received unexpected change after close");