2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Throwables;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.FluentFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.math.BigInteger;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.Optional;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.junit.runner.RunWith;
46 import org.junit.runners.Parameterized;
47 import org.junit.runners.Parameterized.Parameter;
48 import org.junit.runners.Parameterized.Parameters;
49 import org.mockito.Mockito;
50 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
51 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
52 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
55 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
57 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
58 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
59 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
60 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
61 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
62 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
63 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
64 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
65 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
66 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
67 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
68 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
69 import org.opendaylight.mdsal.common.api.ReadFailedException;
70 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
71 import org.opendaylight.mdsal.common.api.TransactionChainListener;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
74 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
75 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
76 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
77 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
78 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
79 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
80 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
81 import org.opendaylight.yangtools.concepts.ListenerRegistration;
82 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
83 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
84 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
85 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
86 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
87 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
88 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
89 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
90 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
91 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
93 @RunWith(Parameterized.class)
94 public class DistributedDataStoreIntegrationTest {
96 @Parameters(name = "{0}")
97 public static Collection<Object[]> data() {
98 return Arrays.asList(new Object[][] {
99 { DistributedDataStore.class }, { ClientBackedDataStore.class }
104 public Class<? extends AbstractDataStore> testParameter;
106 private ActorSystem system;
108 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
109 .shardHeartbeatIntervalInMillis(100);
112 public void setUp() {
113 InMemorySnapshotStore.clear();
114 InMemoryJournal.clear();
115 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
116 Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
117 Cluster.get(system).join(member1Address);
121 public void tearDown() {
122 TestKit.shutdownActorSystem(system, Boolean.TRUE);
126 protected ActorSystem getSystem() {
131 public void testWriteTransactionWithSingleShard() throws Exception {
132 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
133 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
134 testParameter, "transactionIntegrationTest", "test-1")) {
136 testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
137 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
139 testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
140 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
146 public void testWriteTransactionWithMultipleShards() throws Exception {
147 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
148 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
149 testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
151 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
152 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
154 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
155 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
157 testKit.doCommit(writeTx.ready());
159 writeTx = dataStore.newWriteOnlyTransaction();
161 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
162 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
164 testKit.doCommit(writeTx.ready());
166 writeTx = dataStore.newWriteOnlyTransaction();
168 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
169 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
170 writeTx.write(carPath, car);
172 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
173 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
174 writeTx.write(personPath, person);
176 testKit.doCommit(writeTx.ready());
178 // Verify the data in the store
179 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
181 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
182 assertEquals("isPresent", true, optional.isPresent());
183 assertEquals("Data node", car, optional.get());
185 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
186 assertEquals("isPresent", true, optional.isPresent());
187 assertEquals("Data node", person, optional.get());
192 public void testReadWriteTransactionWithSingleShard() throws Exception {
193 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
194 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
195 testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
197 // 1. Create a read-write Tx
198 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
199 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
201 // 2. Write some data
202 final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
203 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
204 readWriteTx.write(nodePath, nodeToWrite);
206 // 3. Read the data from Tx
207 final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
208 assertEquals("exists", true, exists);
210 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
211 assertEquals("isPresent", true, optional.isPresent());
212 assertEquals("Data node", nodeToWrite, optional.get());
214 // 4. Ready the Tx for commit
215 final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
218 testKit.doCommit(cohort);
220 // 6. Verify the data in the store
221 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
223 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
224 assertEquals("isPresent", true, optional.isPresent());
225 assertEquals("Data node", nodeToWrite, optional.get());
230 public void testReadWriteTransactionWithMultipleShards() throws Exception {
231 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
232 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
233 testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
235 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
236 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
238 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
239 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
241 testKit.doCommit(readWriteTx.ready());
243 readWriteTx = dataStore.newReadWriteTransaction();
245 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
246 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
248 testKit.doCommit(readWriteTx.ready());
250 readWriteTx = dataStore.newReadWriteTransaction();
252 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
253 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
254 readWriteTx.write(carPath, car);
256 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
257 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
258 readWriteTx.write(personPath, person);
260 final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
261 assertEquals("exists", true, exists);
263 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
264 assertEquals("isPresent", true, optional.isPresent());
265 assertEquals("Data node", car, optional.get());
267 testKit.doCommit(readWriteTx.ready());
269 // Verify the data in the store
270 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
272 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
273 assertEquals("isPresent", true, optional.isPresent());
274 assertEquals("Data node", car, optional.get());
276 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
277 assertEquals("isPresent", true, optional.isPresent());
278 assertEquals("Data node", person, optional.get());
283 public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
284 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
285 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
286 testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
288 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
290 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
291 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
292 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
293 testKit.doCommit(writeTx.ready());
295 writeTx = txChain.newWriteOnlyTransaction();
298 for (int i = 0; i < numCars; i++) {
299 writeTx.write(CarsModel.newCarPath("car" + i),
300 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
303 testKit.doCommit(writeTx.ready());
305 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
306 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
307 assertEquals("isPresent", true, optional.isPresent());
308 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
312 @SuppressWarnings("checkstyle:IllegalCatch")
313 private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
315 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
316 final String shardName = "test-1";
318 // Setup the InMemoryJournal to block shard recovery to ensure
320 // initialized until we create and submit the write the Tx.
321 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
322 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
323 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
325 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
326 testParameter, testName, false, shardName)) {
328 // Create the write Tx
329 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
330 : dataStore.newReadWriteTransaction();
331 assertNotNull("newReadWriteTransaction returned null", writeTx);
333 // Do some modification operations and ready the Tx on a
335 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
336 .builder(TestModel.OUTER_LIST_PATH)
337 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
339 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
340 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
341 final CountDownLatch txReady = new CountDownLatch(1);
342 final Thread txThread = new Thread(() -> {
344 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
346 writeTx.merge(TestModel.OUTER_LIST_PATH,
347 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
350 writeTx.write(listEntryPath,
351 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
353 writeTx.delete(listEntryPath);
355 txCohort.set(writeTx.ready());
356 } catch (Exception e) {
365 // Wait for the Tx operations to complete.
366 final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
367 if (caughtEx.get() != null) {
368 throw caughtEx.get();
371 assertEquals("Tx ready", true, done);
373 // At this point the Tx operations should be waiting for the
374 // shard to initialize so
375 // trigger the latch to let the shard recovery to continue.
376 blockRecoveryLatch.countDown();
378 // Wait for the Tx commit to complete.
379 testKit.doCommit(txCohort.get());
381 // Verify the data in the store
382 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
384 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
385 assertEquals("isPresent", true, optional.isPresent());
387 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
388 assertEquals("isPresent", true, optional.isPresent());
390 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
391 assertEquals("isPresent", false, optional.isPresent());
396 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
397 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
398 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
402 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
403 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
407 @SuppressWarnings("checkstyle:IllegalCatch")
408 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
409 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
410 final String testName = "testTransactionReadsWithShardNotInitiallyReady";
411 final String shardName = "test-1";
413 // Setup the InMemoryJournal to block shard recovery to ensure
415 // initialized until we create the Tx.
416 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
417 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
418 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
420 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
421 testParameter, testName, false, shardName)) {
423 // Create the read-write Tx
424 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
425 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
427 // Do some reads on the Tx on a separate thread.
428 final AtomicReference<FluentFuture<Boolean>> txExistsFuture = new AtomicReference<>();
429 final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>> txReadFuture = new AtomicReference<>();
430 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
431 final CountDownLatch txReadsDone = new CountDownLatch(1);
432 final Thread txThread = new Thread(() -> {
434 readWriteTx.write(TestModel.TEST_PATH,
435 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
437 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
439 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
440 } catch (Exception e) {
443 txReadsDone.countDown();
449 // Wait for the Tx operations to complete.
450 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
451 if (caughtEx.get() != null) {
452 throw caughtEx.get();
455 assertEquals("Tx reads done", true, done);
457 // At this point the Tx operations should be waiting for the
458 // shard to initialize so
459 // trigger the latch to let the shard recovery to continue.
460 blockRecoveryLatch.countDown();
462 // Wait for the reads to complete and verify.
463 assertEquals("exists", true, txExistsFuture.get().get(5, TimeUnit.SECONDS));
464 assertEquals("read", true, txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent());
470 @Test(expected = NotInitializedException.class)
471 @SuppressWarnings("checkstyle:IllegalCatch")
472 public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
473 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
474 final String testName = "testTransactionCommitFailureWithShardNotInitialized";
475 final String shardName = "test-1";
477 // Set the shard initialization timeout low for the test.
478 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
480 // Setup the InMemoryJournal to block shard recovery
482 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
483 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
484 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
486 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
488 final AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName);
490 // Create the write Tx
491 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
492 assertNotNull("newReadWriteTransaction returned null", writeTx);
494 // Do some modifications and ready the Tx on a separate
496 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
497 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
498 final CountDownLatch txReady = new CountDownLatch(1);
499 final Thread txThread = new Thread(() -> {
501 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
503 txCohort.set(writeTx.ready());
504 } catch (Exception e) {
513 // Wait for the Tx operations to complete.
514 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
515 if (caughtEx.get() != null) {
516 throw caughtEx.get();
519 assertEquals("Tx ready", true, done);
521 // Wait for the commit to complete. Since the shard never
522 // initialized, the Tx should
523 // have timed out and throw an appropriate exception cause.
525 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
526 fail("Expected NotInitializedException");
527 } catch (final Exception e) {
528 final Throwable root = Throwables.getRootCause(e);
529 Throwables.throwIfUnchecked(root);
530 throw new RuntimeException(root);
532 blockRecoveryLatch.countDown();
536 @Test(expected = NotInitializedException.class)
537 @SuppressWarnings("checkstyle:IllegalCatch")
538 public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
539 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
540 final String testName = "testTransactionReadFailureWithShardNotInitialized";
541 final String shardName = "test-1";
543 // Set the shard initialization timeout low for the test.
544 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
546 // Setup the InMemoryJournal to block shard recovery
548 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
549 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
550 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
552 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
554 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
556 // Create the read-write Tx
557 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
558 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
560 // Do a read on the Tx on a separate thread.
561 final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>> txReadFuture = new AtomicReference<>();
562 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
563 final CountDownLatch txReadDone = new CountDownLatch(1);
564 final Thread txThread = new Thread(() -> {
566 readWriteTx.write(TestModel.TEST_PATH,
567 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
569 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
572 } catch (Exception e) {
575 txReadDone.countDown();
581 // Wait for the Tx operations to complete.
582 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
583 if (caughtEx.get() != null) {
584 throw caughtEx.get();
587 assertEquals("Tx read done", true, done);
589 // Wait for the read to complete. Since the shard never
590 // initialized, the Tx should
591 // have timed out and throw an appropriate exception cause.
593 txReadFuture.get().get(5, TimeUnit.SECONDS);
594 } catch (ExecutionException e) {
595 assertTrue("Expected ReadFailedException cause: " + e.getCause(),
596 e.getCause() instanceof ReadFailedException);
597 final Throwable root = Throwables.getRootCause(e);
598 Throwables.throwIfUnchecked(root);
599 throw new RuntimeException(root);
601 blockRecoveryLatch.countDown();
606 @SuppressWarnings("checkstyle:IllegalCatch")
607 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
609 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
610 final String shardName = "default";
612 // We don't want the shard to become the leader so prevent shard
614 datastoreContextBuilder.customRaftPolicyImplementation(
615 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
617 // The ShardManager uses the election timeout for FindPrimary so
618 // reset it low so it will timeout quickly.
619 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
620 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
622 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
624 final Object result = dataStore.getActorContext().executeOperation(
625 dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
626 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
628 // Create the write Tx.
629 DOMStoreWriteTransaction writeTxToClose = null;
631 writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
632 : dataStore.newReadWriteTransaction();
633 final DOMStoreWriteTransaction writeTx = writeTxToClose;
634 assertNotNull("newReadWriteTransaction returned null", writeTx);
636 // Do some modifications and ready the Tx on a separate
638 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
639 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
640 final CountDownLatch txReady = new CountDownLatch(1);
641 final Thread txThread = new Thread(() -> {
643 writeTx.write(TestModel.JUNK_PATH,
644 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
646 txCohort.set(writeTx.ready());
647 } catch (Exception e) {
656 // Wait for the Tx operations to complete.
657 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
658 if (caughtEx.get() != null) {
659 throw caughtEx.get();
662 assertEquals("Tx ready", true, done);
664 // Wait for the commit to complete. Since no shard
665 // leader was elected in time, the Tx
666 // should have timed out and throw an appropriate
669 txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
670 fail("Expected NoShardLeaderException");
671 } catch (final ExecutionException e) {
672 final String msg = "Unexpected exception: "
673 + Throwables.getStackTraceAsString(e.getCause());
674 if (DistributedDataStore.class.equals(testParameter)) {
675 assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
677 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
682 if (writeTxToClose != null) {
683 writeTxToClose.close();
685 } catch (Exception e) {
686 // FIXME TransactionProxy.close throws IllegalStateException:
687 // Transaction is ready, it cannot be closed
694 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
695 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
696 testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
700 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
701 testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
705 public void testTransactionAbort() throws Exception {
706 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
707 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
708 testParameter, "transactionAbortIntegrationTest", "test-1")) {
710 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
711 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
713 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
715 final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
717 cohort.canCommit().get(5, TimeUnit.SECONDS);
719 cohort.abort().get(5, TimeUnit.SECONDS);
721 testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
722 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
727 @SuppressWarnings("checkstyle:IllegalCatch")
728 public void testTransactionChainWithSingleShard() throws Exception {
729 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
730 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
731 testParameter, "testTransactionChainWithSingleShard", "test-1")) {
733 // 1. Create a Tx chain and write-only Tx
734 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
736 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
737 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
739 // 2. Write some data
740 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
741 writeTx.write(TestModel.TEST_PATH, testNode);
743 // 3. Ready the Tx for commit
744 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
746 // 4. Commit the Tx on another thread that first waits for
747 // the second read Tx.
748 final CountDownLatch continueCommit1 = new CountDownLatch(1);
749 final CountDownLatch commit1Done = new CountDownLatch(1);
750 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
753 continueCommit1.await();
754 testKit.doCommit(cohort1);
755 } catch (Exception e) {
758 commit1Done.countDown();
762 // 5. Create a new read Tx from the chain to read and verify
763 // the data from the first
764 // Tx is visible after being readied.
765 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
766 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
767 assertEquals("isPresent", true, optional.isPresent());
768 assertEquals("Data node", testNode, optional.get());
770 // 6. Create a new RW Tx from the chain, write more data,
772 final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
773 final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
775 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
777 final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
779 // 7. Create a new read Tx from the chain to read the data
780 // from the last RW Tx to
781 // verify it is visible.
782 readTx = txChain.newReadWriteTransaction();
783 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
784 assertEquals("isPresent", true, optional.isPresent());
785 assertEquals("Data node", outerNode, optional.get());
787 // 8. Wait for the 2 commits to complete and close the
789 continueCommit1.countDown();
790 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
792 if (commit1Error.get() != null) {
793 throw commit1Error.get();
796 testKit.doCommit(cohort2);
800 // 9. Create a new read Tx from the data store and verify
802 readTx = dataStore.newReadOnlyTransaction();
803 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
804 assertEquals("isPresent", true, optional.isPresent());
805 assertEquals("Data node", outerNode, optional.get());
810 public void testTransactionChainWithMultipleShards() throws Exception {
811 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
812 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
813 testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
815 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
817 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
818 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
820 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
821 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
823 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
824 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
826 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
828 final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
830 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
831 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
832 readWriteTx.write(carPath, car);
834 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
835 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
836 readWriteTx.merge(personPath, person);
838 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
839 assertEquals("isPresent", true, optional.isPresent());
840 assertEquals("Data node", car, optional.get());
842 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
843 assertEquals("isPresent", true, optional.isPresent());
844 assertEquals("Data node", person, optional.get());
846 final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
848 writeTx = txChain.newWriteOnlyTransaction();
850 writeTx.delete(carPath);
852 final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
854 final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
855 final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
857 testKit.doCommit(canCommit1, cohort1);
858 testKit.doCommit(canCommit2, cohort2);
859 testKit.doCommit(cohort3);
863 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
865 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
866 assertEquals("isPresent", false, optional.isPresent());
868 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
869 assertEquals("isPresent", true, optional.isPresent());
870 assertEquals("Data node", person, optional.get());
875 public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
876 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
877 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
878 testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
880 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
881 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
882 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
883 MoreExecutors.directExecutor());
885 final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
886 DOMTransactionChain txChain = broker.createTransactionChain(listener);
888 final List<ListenableFuture<?>> futures = new ArrayList<>();
890 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
891 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
892 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
893 futures.add(writeTx.commit());
896 for (int i = 0; i < numCars; i++) {
897 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
899 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
900 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
902 futures.add(rwTx.commit());
905 for (final ListenableFuture<?> f : futures) {
906 f.get(5, TimeUnit.SECONDS);
909 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
910 .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
911 assertEquals("isPresent", true, optional.isPresent());
912 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
921 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
922 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
923 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
924 testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
926 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
928 final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
932 final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
934 final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(
935 5, TimeUnit.SECONDS);
936 assertEquals("isPresent", false, optional.isPresent());
943 public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
944 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
945 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
946 testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
948 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
950 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
951 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
953 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
955 // Try to create another Tx of each type - each should fail
956 // b/c the previous Tx wasn't
958 testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
963 public void testCreateChainedTransactionAfterClose() throws Exception {
964 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
965 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
966 testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
968 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
971 // Try to create another Tx of each type - should fail b/c
972 // the previous Tx was closed.
973 testKit.assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
978 public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
979 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
980 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
981 testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
983 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
985 // Create a write tx and submit.
986 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
987 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
988 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
990 // Create read-only tx's and issue a read.
991 FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture1 = txChain
992 .newReadOnlyTransaction().read(TestModel.TEST_PATH);
994 FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture2 = txChain
995 .newReadOnlyTransaction().read(TestModel.TEST_PATH);
997 // Create another write tx and issue the write.
998 DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
999 writeTx2.write(TestModel.OUTER_LIST_PATH,
1000 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
1003 // Ensure the reads succeed.
1005 assertEquals("isPresent", true, readFuture1.get(5, TimeUnit.SECONDS).isPresent());
1006 assertEquals("isPresent", true, readFuture2.get(5, TimeUnit.SECONDS).isPresent());
1008 // Ensure the writes succeed.
1009 DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1011 testKit.doCommit(cohort1);
1012 testKit.doCommit(cohort2);
1014 assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
1015 .get(5, TimeUnit.SECONDS).isPresent());
1020 public void testChainedTransactionFailureWithSingleShard() throws Exception {
1021 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
1022 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
1023 testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1025 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1026 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1027 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1028 MoreExecutors.directExecutor());
1030 final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1031 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1033 final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
1035 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1036 PeopleModel.emptyContainer());
1038 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1039 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1040 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1042 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1045 writeTx.commit().get(5, TimeUnit.SECONDS);
1046 fail("Expected TransactionCommitFailedException");
1047 } catch (final ExecutionException e) {
1051 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1052 any(Throwable.class));
1060 public void testChainedTransactionFailureWithMultipleShards() throws Exception {
1061 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
1062 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
1063 testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1065 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1066 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1067 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1068 MoreExecutors.directExecutor());
1070 final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1071 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1073 final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
1075 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1076 PeopleModel.emptyContainer());
1078 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1079 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1080 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1082 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1084 // Note that merge will validate the data and fail but put
1085 // succeeds b/c deep validation is not
1086 // done for put for performance reasons.
1088 writeTx.commit().get(5, TimeUnit.SECONDS);
1089 fail("Expected TransactionCommitFailedException");
1090 } catch (final ExecutionException e) {
1094 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1095 any(Throwable.class));
1103 public void testDataTreeChangeListenerRegistration() throws Exception {
1104 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
1105 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
1106 testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
1108 testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
1109 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1111 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1113 ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
1114 .registerTreeChangeListener(TestModel.TEST_PATH, listener);
1116 assertNotNull("registerTreeChangeListener returned null", listenerReg);
1118 IntegrationTestKit.verifyShardState(dataStore, "test-1",
1119 state -> assertEquals("getTreeChangeListenerActors", 1,
1120 state.getTreeChangeListenerActors().size()));
1122 // Wait for the initial notification
1123 listener.waitForChangeEvents(TestModel.TEST_PATH);
1127 testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1128 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
1131 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1132 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1133 testKit.testWriteTransaction(dataStore, listPath,
1134 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1136 // Wait for the 2 updates.
1137 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1138 listenerReg.close();
1140 IntegrationTestKit.verifyShardState(dataStore, "test-1",
1141 state -> assertEquals("getTreeChangeListenerActors", 0,
1142 state.getTreeChangeListenerActors().size()));
1144 testKit.testWriteTransaction(dataStore,
1145 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1146 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1147 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1149 listener.expectNoMoreChanges("Received unexpected change after close");
1154 public void testRestoreFromDatastoreSnapshot() throws Exception {
1155 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
1156 final String name = "transactionIntegrationTest";
1158 final ContainerNode carsNode = CarsModel.newCarsNode(
1159 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1160 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1162 DataTree dataTree = new InMemoryDataTreeFactory().create(
1163 DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
1164 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1165 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1167 final Snapshot carsSnapshot = Snapshot.create(
1168 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1169 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1171 dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1172 SchemaContextHelper.full());
1174 final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1175 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1177 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1179 final Snapshot peopleSnapshot = Snapshot.create(
1180 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1181 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1183 testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1184 new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
1185 new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
1187 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
1188 testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
1190 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1193 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1194 assertEquals("isPresent", true, optional.isPresent());
1195 assertEquals("Data node", carsNode, optional.get());
1197 optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1198 assertEquals("isPresent", true, optional.isPresent());
1199 assertEquals("Data node", peopleNode, optional.get());