2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.javadsl.TestKit;
25 import com.google.common.base.Throwables;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.FluentFuture;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.MoreExecutors;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import com.typesafe.config.ConfigFactory;
32 import java.io.IOException;
33 import java.math.BigInteger;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collection;
37 import java.util.Collections;
38 import java.util.List;
39 import java.util.Optional;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicReference;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.junit.runners.Parameterized;
49 import org.junit.runners.Parameterized.Parameter;
50 import org.junit.runners.Parameterized.Parameters;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
53 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
54 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
58 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
59 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
60 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
61 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
62 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
63 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
66 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
67 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
68 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
69 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.common.api.ReadFailedException;
72 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
73 import org.opendaylight.mdsal.common.api.TransactionChainListener;
74 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
75 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
76 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
77 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
78 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
79 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
80 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
81 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
82 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
83 import org.opendaylight.yangtools.concepts.ListenerRegistration;
84 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
85 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
86 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
87 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
88 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
89 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
90 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
91 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
92 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
93 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
95 @RunWith(Parameterized.class)
96 public class DistributedDataStoreIntegrationTest {
98 @Parameters(name = "{0}")
99 public static Collection<Object[]> data() {
100 return Arrays.asList(new Object[][] {
101 { DistributedDataStore.class }, { ClientBackedDataStore.class }
106 public Class<? extends AbstractDataStore> testParameter;
108 private ActorSystem system;
110 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
111 .shardHeartbeatIntervalInMillis(100);
114 public void setUp() throws IOException {
115 InMemorySnapshotStore.clear();
116 InMemoryJournal.clear();
117 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
118 Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
119 Cluster.get(system).join(member1Address);
123 public void tearDown() throws IOException {
124 TestKit.shutdownActorSystem(system, Boolean.TRUE);
128 protected ActorSystem getSystem() {
133 public void testWriteTransactionWithSingleShard() throws Exception {
134 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
136 try (AbstractDataStore dataStore = setupAbstractDataStore(
137 testParameter, "transactionIntegrationTest", "test-1")) {
139 testWriteTransaction(dataStore, TestModel.TEST_PATH,
140 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
142 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
143 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
150 public void testWriteTransactionWithMultipleShards() throws Exception {
151 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
153 try (AbstractDataStore dataStore = setupAbstractDataStore(
154 testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
156 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
157 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
159 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
160 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
162 doCommit(writeTx.ready());
164 writeTx = dataStore.newWriteOnlyTransaction();
166 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
167 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
169 doCommit(writeTx.ready());
171 writeTx = dataStore.newWriteOnlyTransaction();
173 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
174 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
175 writeTx.write(carPath, car);
177 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
178 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
179 writeTx.write(personPath, person);
181 doCommit(writeTx.ready());
183 // Verify the data in the store
184 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
186 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
187 assertEquals("isPresent", true, optional.isPresent());
188 assertEquals("Data node", car, optional.get());
190 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
191 assertEquals("isPresent", true, optional.isPresent());
192 assertEquals("Data node", person, optional.get());
199 public void testReadWriteTransactionWithSingleShard() throws Exception {
200 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
202 try (AbstractDataStore dataStore = setupAbstractDataStore(
203 testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
205 // 1. Create a read-write Tx
206 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
207 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
209 // 2. Write some data
210 final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
211 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
212 readWriteTx.write(nodePath, nodeToWrite);
214 // 3. Read the data from Tx
215 final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
216 assertEquals("exists", true, exists);
218 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
219 assertEquals("isPresent", true, optional.isPresent());
220 assertEquals("Data node", nodeToWrite, optional.get());
222 // 4. Ready the Tx for commit
223 final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
228 // 6. Verify the data in the store
229 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
231 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
232 assertEquals("isPresent", true, optional.isPresent());
233 assertEquals("Data node", nodeToWrite, optional.get());
240 public void testReadWriteTransactionWithMultipleShards() throws Exception {
241 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
243 try (AbstractDataStore dataStore = setupAbstractDataStore(
244 testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
246 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
247 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
249 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
250 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
252 doCommit(readWriteTx.ready());
254 readWriteTx = dataStore.newReadWriteTransaction();
256 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
257 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
259 doCommit(readWriteTx.ready());
261 readWriteTx = dataStore.newReadWriteTransaction();
263 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
264 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
265 readWriteTx.write(carPath, car);
267 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
268 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
269 readWriteTx.write(personPath, person);
271 final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
272 assertEquals("exists", true, exists);
274 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
275 assertEquals("isPresent", true, optional.isPresent());
276 assertEquals("Data node", car, optional.get());
278 doCommit(readWriteTx.ready());
280 // Verify the data in the store
281 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
283 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
284 assertEquals("isPresent", true, optional.isPresent());
285 assertEquals("Data node", car, optional.get());
287 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
288 assertEquals("isPresent", true, optional.isPresent());
289 assertEquals("Data node", person, optional.get());
297 public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
298 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
300 try (AbstractDataStore dataStore = setupAbstractDataStore(
301 testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
303 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
305 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
306 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
307 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
308 doCommit(writeTx.ready());
310 writeTx = txChain.newWriteOnlyTransaction();
313 for (int i = 0; i < numCars; i++) {
314 writeTx.write(CarsModel.newCarPath("car" + i),
315 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
318 doCommit(writeTx.ready());
320 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
321 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
322 assertEquals("isPresent", true, optional.isPresent());
323 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
329 @SuppressWarnings("checkstyle:IllegalCatch")
330 private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
332 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
334 final String shardName = "test-1";
336 // Setup the InMemoryJournal to block shard recovery to ensure
338 // initialized until we create and submit the write the Tx.
339 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
340 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
341 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
343 try (AbstractDataStore dataStore = setupAbstractDataStore(
344 testParameter, testName, false, shardName)) {
346 // Create the write Tx
347 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
348 : dataStore.newReadWriteTransaction();
349 assertNotNull("newReadWriteTransaction returned null", writeTx);
351 // Do some modification operations and ready the Tx on a
353 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
354 .builder(TestModel.OUTER_LIST_PATH)
355 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
357 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
358 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
359 final CountDownLatch txReady = new CountDownLatch(1);
360 final Thread txThread = new Thread(() -> {
362 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
364 writeTx.merge(TestModel.OUTER_LIST_PATH,
365 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
367 writeTx.write(listEntryPath,
368 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
370 writeTx.delete(listEntryPath);
372 txCohort.set(writeTx.ready());
373 } catch (Exception e) {
382 // Wait for the Tx operations to complete.
383 final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
384 if (caughtEx.get() != null) {
385 throw caughtEx.get();
388 assertEquals("Tx ready", true, done);
390 // At this point the Tx operations should be waiting for the
391 // shard to initialize so
392 // trigger the latch to let the shard recovery to continue.
393 blockRecoveryLatch.countDown();
395 // Wait for the Tx commit to complete.
396 doCommit(txCohort.get());
398 // Verify the data in the store
399 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
401 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
402 assertEquals("isPresent", true, optional.isPresent());
404 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
405 assertEquals("isPresent", true, optional.isPresent());
407 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
408 assertEquals("isPresent", false, optional.isPresent());
415 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
416 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
417 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
421 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
422 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
426 @SuppressWarnings("checkstyle:IllegalCatch")
427 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
428 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
430 final String testName = "testTransactionReadsWithShardNotInitiallyReady";
431 final String shardName = "test-1";
433 // Setup the InMemoryJournal to block shard recovery to ensure
435 // initialized until we create the Tx.
436 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
437 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
438 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
440 try (AbstractDataStore dataStore = setupAbstractDataStore(
441 testParameter, testName, false, shardName)) {
443 // Create the read-write Tx
444 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
445 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
447 // Do some reads on the Tx on a separate thread.
448 final AtomicReference<FluentFuture<Boolean>> txExistsFuture = new AtomicReference<>();
449 final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>>
450 txReadFuture = new AtomicReference<>();
451 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
452 final CountDownLatch txReadsDone = new CountDownLatch(1);
453 final Thread txThread = new Thread(() -> {
455 readWriteTx.write(TestModel.TEST_PATH,
456 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
458 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
460 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
461 } catch (Exception e) {
464 txReadsDone.countDown();
470 // Wait for the Tx operations to complete.
471 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
472 if (caughtEx.get() != null) {
473 throw caughtEx.get();
476 assertEquals("Tx reads done", true, done);
478 // At this point the Tx operations should be waiting for the
479 // shard to initialize so
480 // trigger the latch to let the shard recovery to continue.
481 blockRecoveryLatch.countDown();
483 // Wait for the reads to complete and verify.
484 assertEquals("exists", true, txExistsFuture.get().get(5, TimeUnit.SECONDS));
485 assertEquals("read", true, txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent());
493 @Test(expected = NotInitializedException.class)
494 @SuppressWarnings("checkstyle:IllegalCatch")
495 public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
496 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
498 final String testName = "testTransactionCommitFailureWithShardNotInitialized";
499 final String shardName = "test-1";
501 // Set the shard initialization timeout low for the test.
502 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
504 // Setup the InMemoryJournal to block shard recovery
506 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
507 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
508 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
510 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
512 final AbstractDataStore dataStore =
513 setupAbstractDataStore(testParameter, testName, false, shardName);
515 // Create the write Tx
516 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
517 assertNotNull("newReadWriteTransaction returned null", writeTx);
519 // Do some modifications and ready the Tx on a separate
521 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
522 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
523 final CountDownLatch txReady = new CountDownLatch(1);
524 final Thread txThread = new Thread(() -> {
526 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
528 txCohort.set(writeTx.ready());
529 } catch (Exception e) {
538 // Wait for the Tx operations to complete.
539 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
540 if (caughtEx.get() != null) {
541 throw caughtEx.get();
544 assertEquals("Tx ready", true, done);
546 // Wait for the commit to complete. Since the shard never
547 // initialized, the Tx should
548 // have timed out and throw an appropriate exception cause.
550 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
551 fail("Expected NotInitializedException");
552 } catch (final Exception e) {
553 final Throwable root = Throwables.getRootCause(e);
554 Throwables.throwIfUnchecked(root);
555 throw new RuntimeException(root);
557 blockRecoveryLatch.countDown();
563 @Test(expected = NotInitializedException.class)
564 @SuppressWarnings("checkstyle:IllegalCatch")
565 public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
566 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
568 final String testName = "testTransactionReadFailureWithShardNotInitialized";
569 final String shardName = "test-1";
571 // Set the shard initialization timeout low for the test.
572 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
574 // Setup the InMemoryJournal to block shard recovery
576 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
577 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
578 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
580 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
582 try (AbstractDataStore dataStore = setupAbstractDataStore(
583 testParameter, testName, false, shardName)) {
585 // Create the read-write Tx
586 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
587 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
589 // Do a read on the Tx on a separate thread.
590 final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>>
591 txReadFuture = new AtomicReference<>();
592 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
593 final CountDownLatch txReadDone = new CountDownLatch(1);
594 final Thread txThread = new Thread(() -> {
596 readWriteTx.write(TestModel.TEST_PATH,
597 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
599 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
602 } catch (Exception e) {
605 txReadDone.countDown();
611 // Wait for the Tx operations to complete.
612 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
613 if (caughtEx.get() != null) {
614 throw caughtEx.get();
617 assertEquals("Tx read done", true, done);
619 // Wait for the read to complete. Since the shard never
620 // initialized, the Tx should
621 // have timed out and throw an appropriate exception cause.
623 txReadFuture.get().get(5, TimeUnit.SECONDS);
624 } catch (ExecutionException e) {
625 assertTrue("Expected ReadFailedException cause: " + e.getCause(),
626 e.getCause() instanceof ReadFailedException);
627 final Throwable root = Throwables.getRootCause(e);
628 Throwables.throwIfUnchecked(root);
629 throw new RuntimeException(root);
631 blockRecoveryLatch.countDown();
638 @SuppressWarnings("checkstyle:IllegalCatch")
639 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
641 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
643 final String shardName = "default";
645 // We don't want the shard to become the leader so prevent shard
647 datastoreContextBuilder.customRaftPolicyImplementation(
648 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
650 // The ShardManager uses the election timeout for FindPrimary so
651 // reset it low so it will timeout quickly.
652 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
653 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
655 try (AbstractDataStore dataStore = setupAbstractDataStore(
656 testParameter, testName, false, shardName)) {
658 final Object result = dataStore.getActorContext().executeOperation(
659 dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
660 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
662 // Create the write Tx.
663 DOMStoreWriteTransaction writeTxToClose = null;
665 writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
666 : dataStore.newReadWriteTransaction();
667 final DOMStoreWriteTransaction writeTx = writeTxToClose;
668 assertNotNull("newReadWriteTransaction returned null", writeTx);
670 // Do some modifications and ready the Tx on a separate
672 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
673 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
674 final CountDownLatch txReady = new CountDownLatch(1);
675 final Thread txThread = new Thread(() -> {
677 writeTx.write(TestModel.JUNK_PATH,
678 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
680 txCohort.set(writeTx.ready());
681 } catch (Exception e) {
690 // Wait for the Tx operations to complete.
691 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
692 if (caughtEx.get() != null) {
693 throw caughtEx.get();
696 assertEquals("Tx ready", true, done);
698 // Wait for the commit to complete. Since no shard
699 // leader was elected in time, the Tx
700 // should have timed out and throw an appropriate
703 txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
704 fail("Expected NoShardLeaderException");
705 } catch (final ExecutionException e) {
706 final String msg = "Unexpected exception: "
707 + Throwables.getStackTraceAsString(e.getCause());
708 if (DistributedDataStore.class.equals(testParameter)) {
709 assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
711 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
716 if (writeTxToClose != null) {
717 writeTxToClose.close();
719 } catch (Exception e) {
720 // FIXME TransactionProxy.close throws IllegalStateException:
721 // Transaction is ready, it cannot be closed
730 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
731 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
732 testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
736 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
737 testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
741 public void testTransactionAbort() throws Exception {
742 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
744 try (AbstractDataStore dataStore = setupAbstractDataStore(
745 testParameter, "transactionAbortIntegrationTest", "test-1")) {
747 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
748 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
750 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
752 final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
754 cohort.canCommit().get(5, TimeUnit.SECONDS);
756 cohort.abort().get(5, TimeUnit.SECONDS);
758 testWriteTransaction(dataStore, TestModel.TEST_PATH,
759 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
766 @SuppressWarnings("checkstyle:IllegalCatch")
767 public void testTransactionChainWithSingleShard() throws Exception {
768 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
770 try (AbstractDataStore dataStore = setupAbstractDataStore(
771 testParameter, "testTransactionChainWithSingleShard", "test-1")) {
773 // 1. Create a Tx chain and write-only Tx
774 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
776 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
777 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
779 // 2. Write some data
780 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
781 writeTx.write(TestModel.TEST_PATH, testNode);
783 // 3. Ready the Tx for commit
784 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
786 // 4. Commit the Tx on another thread that first waits for
787 // the second read Tx.
788 final CountDownLatch continueCommit1 = new CountDownLatch(1);
789 final CountDownLatch commit1Done = new CountDownLatch(1);
790 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
793 continueCommit1.await();
795 } catch (Exception e) {
798 commit1Done.countDown();
802 // 5. Create a new read Tx from the chain to read and verify
803 // the data from the first
804 // Tx is visible after being readied.
805 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
806 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
807 assertEquals("isPresent", true, optional.isPresent());
808 assertEquals("Data node", testNode, optional.get());
810 // 6. Create a new RW Tx from the chain, write more data,
812 final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
813 final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
814 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
816 final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
818 // 7. Create a new read Tx from the chain to read the data
819 // from the last RW Tx to
820 // verify it is visible.
821 readTx = txChain.newReadWriteTransaction();
822 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
823 assertEquals("isPresent", true, optional.isPresent());
824 assertEquals("Data node", outerNode, optional.get());
826 // 8. Wait for the 2 commits to complete and close the
828 continueCommit1.countDown();
829 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
831 if (commit1Error.get() != null) {
832 throw commit1Error.get();
839 // 9. Create a new read Tx from the data store and verify
841 readTx = dataStore.newReadOnlyTransaction();
842 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
843 assertEquals("isPresent", true, optional.isPresent());
844 assertEquals("Data node", outerNode, optional.get());
851 public void testTransactionChainWithMultipleShards() throws Exception {
852 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
854 try (AbstractDataStore dataStore = setupAbstractDataStore(
855 testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
857 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
859 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
860 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
862 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
863 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
865 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
866 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
868 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
870 final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
872 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
873 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
874 readWriteTx.write(carPath, car);
876 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
877 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
878 readWriteTx.merge(personPath, person);
880 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
881 assertEquals("isPresent", true, optional.isPresent());
882 assertEquals("Data node", car, optional.get());
884 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
885 assertEquals("isPresent", true, optional.isPresent());
886 assertEquals("Data node", person, optional.get());
888 final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
890 writeTx = txChain.newWriteOnlyTransaction();
892 writeTx.delete(carPath);
894 final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
896 final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
897 final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
899 doCommit(canCommit1, cohort1);
900 doCommit(canCommit2, cohort2);
905 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
907 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
908 assertEquals("isPresent", false, optional.isPresent());
910 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
911 assertEquals("isPresent", true, optional.isPresent());
912 assertEquals("Data node", person, optional.get());
919 public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
920 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
922 try (AbstractDataStore dataStore = setupAbstractDataStore(
923 testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
925 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
926 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
927 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
928 MoreExecutors.directExecutor());
930 final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
931 DOMTransactionChain txChain = broker.createTransactionChain(listener);
933 final List<ListenableFuture<?>> futures = new ArrayList<>();
935 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
936 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
937 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
938 futures.add(writeTx.commit());
941 for (int i = 0; i < numCars; i++) {
942 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
944 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
945 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
947 futures.add(rwTx.commit());
950 for (final ListenableFuture<?> f : futures) {
951 f.get(5, TimeUnit.SECONDS);
954 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
955 .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
956 assertEquals("isPresent", true, optional.isPresent());
957 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
968 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
969 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
971 try (AbstractDataStore dataStore = setupAbstractDataStore(
972 testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
974 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
976 final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
980 final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
982 final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(
983 5, TimeUnit.SECONDS);
984 assertEquals("isPresent", false, optional.isPresent());
993 public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
994 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
996 try (AbstractDataStore dataStore = setupAbstractDataStore(
997 testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
999 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1001 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1002 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
1004 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1006 // Try to create another Tx of each type - each should fail
1007 // b/c the previous Tx wasn't
1009 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
1016 public void testCreateChainedTransactionAfterClose() throws Exception {
1017 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1019 try (AbstractDataStore dataStore = setupAbstractDataStore(
1020 testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
1022 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1025 // Try to create another Tx of each type - should fail b/c
1026 // the previous Tx was closed.
1027 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
1034 public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
1035 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1037 try (AbstractDataStore dataStore = setupAbstractDataStore(
1038 testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
1040 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1042 // Create a write tx and submit.
1043 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1044 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1045 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1047 // Create read-only tx's and issue a read.
1048 FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture1 = txChain
1049 .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1051 FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture2 = txChain
1052 .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1054 // Create another write tx and issue the write.
1055 DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1056 writeTx2.write(TestModel.OUTER_LIST_PATH,
1057 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1059 // Ensure the reads succeed.
1061 assertEquals("isPresent", true, readFuture1.get(5, TimeUnit.SECONDS).isPresent());
1062 assertEquals("isPresent", true, readFuture2.get(5, TimeUnit.SECONDS).isPresent());
1064 // Ensure the writes succeed.
1065 DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1070 assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
1071 .get(5, TimeUnit.SECONDS).isPresent());
1078 public void testChainedTransactionFailureWithSingleShard() throws Exception {
1079 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1081 try (AbstractDataStore dataStore = setupAbstractDataStore(
1082 testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1084 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1085 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1086 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1087 MoreExecutors.directExecutor());
1089 final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1090 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1092 final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
1094 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1095 PeopleModel.emptyContainer());
1097 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1098 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1099 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1101 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1104 writeTx.commit().get(5, TimeUnit.SECONDS);
1105 fail("Expected TransactionCommitFailedException");
1106 } catch (final ExecutionException e) {
1110 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1111 any(Throwable.class));
1121 public void testChainedTransactionFailureWithMultipleShards() throws Exception {
1122 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1124 try (AbstractDataStore dataStore = setupAbstractDataStore(
1125 testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1127 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1128 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1129 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1130 MoreExecutors.directExecutor());
1132 final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1133 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1135 final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
1137 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1138 PeopleModel.emptyContainer());
1140 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1141 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1142 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1144 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1146 // Note that merge will validate the data and fail but put
1147 // succeeds b/c deep validation is not
1148 // done for put for performance reasons.
1150 writeTx.commit().get(5, TimeUnit.SECONDS);
1151 fail("Expected TransactionCommitFailedException");
1152 } catch (final ExecutionException e) {
1156 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1157 any(Throwable.class));
1167 public void testDataTreeChangeListenerRegistration() throws Exception {
1168 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1170 try (AbstractDataStore dataStore = setupAbstractDataStore(
1171 testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
1173 testWriteTransaction(dataStore, TestModel.TEST_PATH,
1174 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1176 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1178 ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
1179 .registerTreeChangeListener(TestModel.TEST_PATH, listener);
1181 assertNotNull("registerTreeChangeListener returned null", listenerReg);
1183 IntegrationTestKit.verifyShardState(dataStore, "test-1",
1184 state -> assertEquals("getTreeChangeListenerActors", 1,
1185 state.getTreeChangeListenerActors().size()));
1187 // Wait for the initial notification
1188 listener.waitForChangeEvents(TestModel.TEST_PATH);
1192 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1193 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1195 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1196 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1197 testWriteTransaction(dataStore, listPath,
1198 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1200 // Wait for the 2 updates.
1201 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1202 listenerReg.close();
1204 IntegrationTestKit.verifyShardState(dataStore, "test-1",
1205 state -> assertEquals("getTreeChangeListenerActors", 0,
1206 state.getTreeChangeListenerActors().size()));
1208 testWriteTransaction(dataStore,
1209 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1210 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1211 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1213 listener.expectNoMoreChanges("Received unexpected change after close");
1220 public void testRestoreFromDatastoreSnapshot() throws Exception {
1221 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1223 final String name = "transactionIntegrationTest";
1225 final ContainerNode carsNode = CarsModel.newCarsNode(
1226 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1227 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1229 DataTree dataTree = new InMemoryDataTreeFactory().create(
1230 DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
1231 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1232 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1234 final Snapshot carsSnapshot = Snapshot.create(
1235 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1236 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1238 dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1239 SchemaContextHelper.full());
1241 final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1242 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1244 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1246 final Snapshot peopleSnapshot = Snapshot.create(
1247 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1248 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1250 restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1251 new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
1252 new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
1254 try (AbstractDataStore dataStore = setupAbstractDataStore(
1255 testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
1257 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1260 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1261 assertEquals("isPresent", true, optional.isPresent());
1262 assertEquals("Data node", carsNode, optional.get());
1264 optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1265 assertEquals("isPresent", true, optional.isPresent());
1266 assertEquals("Data node", peopleNode, optional.get());