2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.junit.runners.Parameterized.Parameter;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorSystem;
22 import com.google.common.base.Throwables;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.FluentFuture;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import java.math.BigInteger;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.List;
34 import java.util.Optional;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.Mockito;
41 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
42 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
43 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
44 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
47 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
48 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
49 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
50 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
51 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
53 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
58 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
59 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
60 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
61 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
62 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
63 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
64 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
65 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
66 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
67 import org.opendaylight.yangtools.concepts.ListenerRegistration;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
71 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
75 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
76 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
77 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
79 public abstract class AbstractDistributedDataStoreIntegrationTest {
82 public Class<? extends AbstractDataStore> testParameter;
84 protected ActorSystem system;
86 protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
87 .shardHeartbeatIntervalInMillis(100);
89 protected ActorSystem getSystem() {
94 public void testWriteTransactionWithSingleShard() throws Exception {
95 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
96 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
97 testParameter, "transactionIntegrationTest", "test-1")) {
99 testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
100 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
102 testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
103 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
104 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
110 public void testWriteTransactionWithMultipleShards() throws Exception {
111 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
112 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
113 testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
115 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
116 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
118 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
119 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
121 testKit.doCommit(writeTx.ready());
123 writeTx = dataStore.newWriteOnlyTransaction();
125 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
126 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
128 testKit.doCommit(writeTx.ready());
130 writeTx = dataStore.newWriteOnlyTransaction();
132 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
133 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
134 writeTx.write(carPath, car);
136 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
137 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
138 writeTx.write(personPath, person);
140 testKit.doCommit(writeTx.ready());
142 // Verify the data in the store
143 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
145 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
146 assertTrue("isPresent", optional.isPresent());
147 assertEquals("Data node", car, optional.get());
149 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
150 assertTrue("isPresent", optional.isPresent());
151 assertEquals("Data node", person, optional.get());
156 public void testReadWriteTransactionWithSingleShard() throws Exception {
157 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
158 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
159 testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
161 // 1. Create a read-write Tx
162 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
163 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
165 // 2. Write some data
166 final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
167 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
168 readWriteTx.write(nodePath, nodeToWrite);
170 // 3. Read the data from Tx
171 final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
172 assertEquals("exists", Boolean.TRUE, exists);
174 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
175 assertTrue("isPresent", optional.isPresent());
176 assertEquals("Data node", nodeToWrite, optional.get());
178 // 4. Ready the Tx for commit
179 final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
182 testKit.doCommit(cohort);
184 // 6. Verify the data in the store
185 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
187 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
188 assertTrue("isPresent", optional.isPresent());
189 assertEquals("Data node", nodeToWrite, optional.get());
194 public void testReadWriteTransactionWithMultipleShards() throws Exception {
195 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
196 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
197 testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
199 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
200 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
202 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
203 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
205 testKit.doCommit(readWriteTx.ready());
207 readWriteTx = dataStore.newReadWriteTransaction();
209 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
210 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
212 testKit.doCommit(readWriteTx.ready());
214 readWriteTx = dataStore.newReadWriteTransaction();
216 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
217 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
218 readWriteTx.write(carPath, car);
220 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
221 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
222 readWriteTx.write(personPath, person);
224 final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
225 assertEquals("exists", Boolean.TRUE, exists);
227 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
228 assertTrue("isPresent", optional.isPresent());
229 assertEquals("Data node", car, optional.get());
231 testKit.doCommit(readWriteTx.ready());
233 // Verify the data in the store
234 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
236 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
237 assertTrue("isPresent", optional.isPresent());
238 assertEquals("Data node", car, optional.get());
240 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
241 assertTrue("isPresent", optional.isPresent());
242 assertEquals("Data node", person, optional.get());
247 public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
248 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
249 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
250 testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
252 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
254 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
255 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
256 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
257 testKit.doCommit(writeTx.ready());
259 writeTx = txChain.newWriteOnlyTransaction();
262 for (int i = 0; i < numCars; i++) {
263 writeTx.write(CarsModel.newCarPath("car" + i),
264 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
267 testKit.doCommit(writeTx.ready());
269 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
270 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
271 assertTrue("isPresent", optional.isPresent());
272 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
276 @SuppressWarnings("checkstyle:IllegalCatch")
277 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
279 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
280 final String shardName = "default";
282 // We don't want the shard to become the leader so prevent shard
284 datastoreContextBuilder.customRaftPolicyImplementation(
285 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
287 // The ShardManager uses the election timeout for FindPrimary so
288 // reset it low so it will timeout quickly.
289 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
290 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
292 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
294 final Object result = dataStore.getActorUtils().executeOperation(
295 dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true));
296 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
298 // Create the write Tx.
299 DOMStoreWriteTransaction writeTxToClose = null;
301 writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
302 : dataStore.newReadWriteTransaction();
303 final DOMStoreWriteTransaction writeTx = writeTxToClose;
304 assertNotNull("newReadWriteTransaction returned null", writeTx);
306 // Do some modifications and ready the Tx on a separate
308 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
309 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
310 final CountDownLatch txReady = new CountDownLatch(1);
311 final Thread txThread = new Thread(() -> {
313 writeTx.write(TestModel.JUNK_PATH,
314 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
316 txCohort.set(writeTx.ready());
317 } catch (Exception e) {
326 // Wait for the Tx operations to complete.
327 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
328 if (caughtEx.get() != null) {
329 throw caughtEx.get();
332 assertTrue("Tx ready", done);
334 // Wait for the commit to complete. Since no shard
335 // leader was elected in time, the Tx
336 // should have timed out and throw an appropriate
339 txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
340 fail("Expected NoShardLeaderException");
341 } catch (final ExecutionException e) {
342 final String msg = "Unexpected exception: "
343 + Throwables.getStackTraceAsString(e.getCause());
344 if (DistributedDataStore.class.equals(testParameter)) {
345 assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
347 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
352 if (writeTxToClose != null) {
353 writeTxToClose.close();
355 } catch (Exception e) {
356 // FIXME TransactionProxy.close throws IllegalStateException:
357 // Transaction is ready, it cannot be closed
364 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
365 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
366 testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
370 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
371 testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
375 public void testTransactionAbort() throws Exception {
376 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
377 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
378 testParameter, "transactionAbortIntegrationTest", "test-1")) {
380 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
381 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
383 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
385 final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
387 cohort.canCommit().get(5, TimeUnit.SECONDS);
389 cohort.abort().get(5, TimeUnit.SECONDS);
391 testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
392 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
397 @SuppressWarnings("checkstyle:IllegalCatch")
398 public void testTransactionChainWithSingleShard() throws Exception {
399 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
400 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
401 testParameter, "testTransactionChainWithSingleShard", "test-1")) {
403 // 1. Create a Tx chain and write-only Tx
404 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
406 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
407 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
409 // 2. Write some data
410 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
411 writeTx.write(TestModel.TEST_PATH, testNode);
413 // 3. Ready the Tx for commit
414 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
416 // 4. Commit the Tx on another thread that first waits for
417 // the second read Tx.
418 final CountDownLatch continueCommit1 = new CountDownLatch(1);
419 final CountDownLatch commit1Done = new CountDownLatch(1);
420 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
423 continueCommit1.await();
424 testKit.doCommit(cohort1);
425 } catch (Exception e) {
428 commit1Done.countDown();
432 // 5. Create a new read Tx from the chain to read and verify
433 // the data from the first
434 // Tx is visible after being readied.
435 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
436 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
437 assertTrue("isPresent", optional.isPresent());
438 assertEquals("Data node", testNode, optional.get());
440 // 6. Create a new RW Tx from the chain, write more data,
442 final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
443 final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
444 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
446 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
448 final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
450 // 7. Create a new read Tx from the chain to read the data
451 // from the last RW Tx to
452 // verify it is visible.
453 readTx = txChain.newReadWriteTransaction();
454 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
455 assertTrue("isPresent", optional.isPresent());
456 assertEquals("Data node", outerNode, optional.get());
458 // 8. Wait for the 2 commits to complete and close the
460 continueCommit1.countDown();
461 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
463 if (commit1Error.get() != null) {
464 throw commit1Error.get();
467 testKit.doCommit(cohort2);
471 // 9. Create a new read Tx from the data store and verify
473 readTx = dataStore.newReadOnlyTransaction();
474 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
475 assertTrue("isPresent", optional.isPresent());
476 assertEquals("Data node", outerNode, optional.get());
481 public void testTransactionChainWithMultipleShards() throws Exception {
482 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
483 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
484 testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
486 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
488 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
489 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
491 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
492 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
494 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
495 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
497 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
499 final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
501 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
502 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
503 readWriteTx.write(carPath, car);
505 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
506 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
507 readWriteTx.merge(personPath, person);
509 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
510 assertTrue("isPresent", optional.isPresent());
511 assertEquals("Data node", car, optional.get());
513 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
514 assertTrue("isPresent", optional.isPresent());
515 assertEquals("Data node", person, optional.get());
517 final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
519 writeTx = txChain.newWriteOnlyTransaction();
521 writeTx.delete(carPath);
523 final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
525 final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
526 final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
528 testKit.doCommit(canCommit1, cohort1);
529 testKit.doCommit(canCommit2, cohort2);
530 testKit.doCommit(cohort3);
534 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
536 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
537 assertFalse("isPresent", optional.isPresent());
539 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
540 assertTrue("isPresent", optional.isPresent());
541 assertEquals("Data node", person, optional.get());
546 public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
547 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
548 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
549 testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
551 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
552 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
553 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
554 MoreExecutors.directExecutor());
556 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
557 DOMTransactionChain txChain = broker.createTransactionChain(listener);
559 final List<ListenableFuture<?>> futures = new ArrayList<>();
561 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
562 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
563 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
564 futures.add(writeTx.commit());
567 for (int i = 0; i < numCars; i++) {
568 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
570 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
571 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
573 futures.add(rwTx.commit());
576 for (final ListenableFuture<?> f : futures) {
577 f.get(5, TimeUnit.SECONDS);
580 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
581 .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
582 assertTrue("isPresent", optional.isPresent());
583 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
592 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
593 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
594 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
595 testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
597 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
599 final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
603 final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
605 final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
606 assertFalse("isPresent", optional.isPresent());
613 public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
614 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
615 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
616 testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
618 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
620 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
621 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
623 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
625 // Try to create another Tx of each type - each should fail
626 // b/c the previous Tx wasn't
628 testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
633 public void testCreateChainedTransactionAfterClose() throws Exception {
634 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
635 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
636 testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
638 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
641 // Try to create another Tx of each type - should fail b/c
642 // the previous Tx was closed.
643 testKit.assertExceptionOnTxChainCreates(txChain, DOMTransactionChainClosedException.class);
648 public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
649 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
650 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
651 testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
653 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
655 // Create a write tx and submit.
656 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
657 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
658 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
660 // Create read-only tx's and issue a read.
661 FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture1 = txChain
662 .newReadOnlyTransaction().read(TestModel.TEST_PATH);
664 FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture2 = txChain
665 .newReadOnlyTransaction().read(TestModel.TEST_PATH);
667 // Create another write tx and issue the write.
668 DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
669 writeTx2.write(TestModel.OUTER_LIST_PATH,
670 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
671 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
674 // Ensure the reads succeed.
676 assertTrue("isPresent", readFuture1.get(5, TimeUnit.SECONDS).isPresent());
677 assertTrue("isPresent", readFuture2.get(5, TimeUnit.SECONDS).isPresent());
679 // Ensure the writes succeed.
680 DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
682 testKit.doCommit(cohort1);
683 testKit.doCommit(cohort2);
685 assertTrue("isPresent", txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
686 .get(5, TimeUnit.SECONDS).isPresent());
691 public void testChainedTransactionFailureWithSingleShard() throws Exception {
692 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
693 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
694 testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
696 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
697 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
698 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
699 MoreExecutors.directExecutor());
701 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
702 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
704 final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
706 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
707 PeopleModel.emptyContainer());
709 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
710 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
711 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
713 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
716 writeTx.commit().get(5, TimeUnit.SECONDS);
717 fail("Expected TransactionCommitFailedException");
718 } catch (final ExecutionException e) {
722 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
723 any(Throwable.class));
731 public void testChainedTransactionFailureWithMultipleShards() throws Exception {
732 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
733 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
734 testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
736 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
737 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
738 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
739 MoreExecutors.directExecutor());
741 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
742 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
744 final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
746 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
747 PeopleModel.emptyContainer());
749 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
750 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
751 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
753 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
755 // Note that merge will validate the data and fail but put
756 // succeeds b/c deep validation is not
757 // done for put for performance reasons.
759 writeTx.commit().get(5, TimeUnit.SECONDS);
760 fail("Expected TransactionCommitFailedException");
761 } catch (final ExecutionException e) {
765 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
766 any(Throwable.class));
774 public void testDataTreeChangeListenerRegistration() throws Exception {
775 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
776 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
777 testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
779 testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
780 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
782 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
784 ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
785 .registerTreeChangeListener(TestModel.TEST_PATH, listener);
787 assertNotNull("registerTreeChangeListener returned null", listenerReg);
789 IntegrationTestKit.verifyShardState(dataStore, "test-1",
790 state -> assertEquals("getTreeChangeListenerActors", 1,
791 state.getTreeChangeListenerActors().size()));
793 // Wait for the initial notification
794 listener.waitForChangeEvents(TestModel.TEST_PATH);
798 testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
799 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
800 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
803 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
804 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
805 testKit.testWriteTransaction(dataStore, listPath,
806 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
808 // Wait for the 2 updates.
809 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
812 IntegrationTestKit.verifyShardState(dataStore, "test-1",
813 state -> assertEquals("getTreeChangeListenerActors", 0,
814 state.getTreeChangeListenerActors().size()));
816 testKit.testWriteTransaction(dataStore,
817 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
818 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
819 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
821 listener.expectNoMoreChanges("Received unexpected change after close");
826 public void testRestoreFromDatastoreSnapshot() throws Exception {
827 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
828 final String name = "transactionIntegrationTest";
830 final ContainerNode carsNode = CarsModel.newCarsNode(
831 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
832 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
834 DataTree dataTree = new InMemoryDataTreeFactory().create(
835 DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
836 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
837 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
839 final Snapshot carsSnapshot = Snapshot.create(
840 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
841 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
843 dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
844 SchemaContextHelper.full());
846 final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
847 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
849 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
851 final Snapshot peopleSnapshot = Snapshot.create(
852 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
853 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
855 testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
856 new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
857 new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
859 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
860 testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
862 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
865 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
866 assertTrue("isPresent", optional.isPresent());
867 assertEquals("Data node", carsNode, optional.get());
869 optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
870 assertTrue("isPresent", optional.isPresent());
871 assertEquals("Data node", peopleNode, optional.get());