2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.javadsl.TestKit;
25 import com.google.common.base.Throwables;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.FluentFuture;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.MoreExecutors;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import com.typesafe.config.ConfigFactory;
32 import java.math.BigInteger;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Optional;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.junit.runner.RunWith;
47 import org.junit.runners.Parameterized;
48 import org.junit.runners.Parameterized.Parameter;
49 import org.junit.runners.Parameterized.Parameters;
50 import org.mockito.Mockito;
51 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
52 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
53 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
56 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
57 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
58 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
59 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
60 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
61 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
62 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
63 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
64 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
65 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
66 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
67 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
68 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
69 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
70 import org.opendaylight.mdsal.common.api.ReadFailedException;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
73 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
74 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
75 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
76 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
77 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
78 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
79 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
80 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
81 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
82 import org.opendaylight.yangtools.concepts.ListenerRegistration;
83 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
84 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
85 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
86 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
87 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
88 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
89 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
90 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
91 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
92 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
94 @RunWith(Parameterized.class)
95 public class DistributedDataStoreIntegrationTest {
97 @Parameters(name = "{0}")
98 public static Collection<Object[]> data() {
99 return Arrays.asList(new Object[][] {
100 { DistributedDataStore.class }, { ClientBackedDataStore.class }
105 public Class<? extends AbstractDataStore> testParameter;
107 private ActorSystem system;
109 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
110 .shardHeartbeatIntervalInMillis(100);
113 public void setUp() {
114 InMemorySnapshotStore.clear();
115 InMemoryJournal.clear();
116 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
117 Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
118 Cluster.get(system).join(member1Address);
122 public void tearDown() {
123 TestKit.shutdownActorSystem(system, true);
127 protected ActorSystem getSystem() {
132 public void testWriteTransactionWithSingleShard() throws Exception {
133 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
134 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
135 testParameter, "transactionIntegrationTest", "test-1")) {
137 testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
138 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
140 testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
141 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
142 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
148 public void testWriteTransactionWithMultipleShards() throws Exception {
149 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
150 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
151 testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
153 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
154 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
156 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
157 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
159 testKit.doCommit(writeTx.ready());
161 writeTx = dataStore.newWriteOnlyTransaction();
163 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
164 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
166 testKit.doCommit(writeTx.ready());
168 writeTx = dataStore.newWriteOnlyTransaction();
170 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
171 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
172 writeTx.write(carPath, car);
174 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
175 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
176 writeTx.write(personPath, person);
178 testKit.doCommit(writeTx.ready());
180 // Verify the data in the store
181 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
183 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
184 assertTrue("isPresent", optional.isPresent());
185 assertEquals("Data node", car, optional.get());
187 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
188 assertTrue("isPresent", optional.isPresent());
189 assertEquals("Data node", person, optional.get());
194 public void testReadWriteTransactionWithSingleShard() throws Exception {
195 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
196 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
197 testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
199 // 1. Create a read-write Tx
200 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
201 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
203 // 2. Write some data
204 final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
205 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
206 readWriteTx.write(nodePath, nodeToWrite);
208 // 3. Read the data from Tx
209 final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
210 assertEquals("exists", Boolean.TRUE, exists);
212 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
213 assertTrue("isPresent", optional.isPresent());
214 assertEquals("Data node", nodeToWrite, optional.get());
216 // 4. Ready the Tx for commit
217 final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
220 testKit.doCommit(cohort);
222 // 6. Verify the data in the store
223 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
225 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
226 assertTrue("isPresent", optional.isPresent());
227 assertEquals("Data node", nodeToWrite, optional.get());
232 public void testReadWriteTransactionWithMultipleShards() throws Exception {
233 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
234 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
235 testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
237 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
238 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
240 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
241 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
243 testKit.doCommit(readWriteTx.ready());
245 readWriteTx = dataStore.newReadWriteTransaction();
247 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
248 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
250 testKit.doCommit(readWriteTx.ready());
252 readWriteTx = dataStore.newReadWriteTransaction();
254 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
255 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
256 readWriteTx.write(carPath, car);
258 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
259 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
260 readWriteTx.write(personPath, person);
262 final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
263 assertEquals("exists", Boolean.TRUE, exists);
265 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
266 assertTrue("isPresent", optional.isPresent());
267 assertEquals("Data node", car, optional.get());
269 testKit.doCommit(readWriteTx.ready());
271 // Verify the data in the store
272 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
274 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
275 assertTrue("isPresent", optional.isPresent());
276 assertEquals("Data node", car, optional.get());
278 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
279 assertTrue("isPresent", optional.isPresent());
280 assertEquals("Data node", person, optional.get());
285 public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
286 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
287 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
288 testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
290 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
292 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
293 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
294 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
295 testKit.doCommit(writeTx.ready());
297 writeTx = txChain.newWriteOnlyTransaction();
300 for (int i = 0; i < numCars; i++) {
301 writeTx.write(CarsModel.newCarPath("car" + i),
302 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
305 testKit.doCommit(writeTx.ready());
307 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
308 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
309 assertTrue("isPresent", optional.isPresent());
310 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
314 @SuppressWarnings("checkstyle:IllegalCatch")
315 private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
317 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
318 final String shardName = "test-1";
320 // Setup the InMemoryJournal to block shard recovery to ensure
322 // initialized until we create and submit the write the Tx.
323 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
324 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
325 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
327 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
328 testParameter, testName, false, shardName)) {
330 // Create the write Tx
331 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
332 : dataStore.newReadWriteTransaction();
333 assertNotNull("newReadWriteTransaction returned null", writeTx);
335 // Do some modification operations and ready the Tx on a
337 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
338 .builder(TestModel.OUTER_LIST_PATH)
339 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
341 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
342 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
343 final CountDownLatch txReady = new CountDownLatch(1);
344 final Thread txThread = new Thread(() -> {
346 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
348 writeTx.merge(TestModel.OUTER_LIST_PATH,
349 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
350 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
353 writeTx.write(listEntryPath,
354 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
356 writeTx.delete(listEntryPath);
358 txCohort.set(writeTx.ready());
359 } catch (Exception e) {
368 // Wait for the Tx operations to complete.
369 final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
370 if (caughtEx.get() != null) {
371 throw caughtEx.get();
374 assertTrue("Tx ready", done);
376 // At this point the Tx operations should be waiting for the
377 // shard to initialize so
378 // trigger the latch to let the shard recovery to continue.
379 blockRecoveryLatch.countDown();
381 // Wait for the Tx commit to complete.
382 testKit.doCommit(txCohort.get());
384 // Verify the data in the store
385 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
387 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
388 assertTrue("isPresent", optional.isPresent());
390 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
391 assertTrue("isPresent", optional.isPresent());
393 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
394 assertFalse("isPresent", optional.isPresent());
399 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
400 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
401 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
405 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
406 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
410 @SuppressWarnings("checkstyle:IllegalCatch")
411 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
412 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
413 final String testName = "testTransactionReadsWithShardNotInitiallyReady";
414 final String shardName = "test-1";
416 // Setup the InMemoryJournal to block shard recovery to ensure
418 // initialized until we create the Tx.
419 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
420 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
421 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
423 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
424 testParameter, testName, false, shardName)) {
426 // Create the read-write Tx
427 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
428 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
430 // Do some reads on the Tx on a separate thread.
431 final AtomicReference<FluentFuture<Boolean>> txExistsFuture = new AtomicReference<>();
432 final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>> txReadFuture = new AtomicReference<>();
433 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
434 final CountDownLatch txReadsDone = new CountDownLatch(1);
435 final Thread txThread = new Thread(() -> {
437 readWriteTx.write(TestModel.TEST_PATH,
438 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
440 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
442 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
443 } catch (Exception e) {
446 txReadsDone.countDown();
452 // Wait for the Tx operations to complete.
453 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
454 if (caughtEx.get() != null) {
455 throw caughtEx.get();
458 assertTrue("Tx reads done", done);
460 // At this point the Tx operations should be waiting for the
461 // shard to initialize so
462 // trigger the latch to let the shard recovery to continue.
463 blockRecoveryLatch.countDown();
465 // Wait for the reads to complete and verify.
466 assertEquals("exists", Boolean.TRUE, txExistsFuture.get().get(5, TimeUnit.SECONDS));
467 assertTrue("read", txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent());
473 @Test(expected = NotInitializedException.class)
474 @SuppressWarnings("checkstyle:IllegalCatch")
475 public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
476 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
477 final String testName = "testTransactionCommitFailureWithShardNotInitialized";
478 final String shardName = "test-1";
480 // Set the shard initialization timeout low for the test.
481 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
483 // Setup the InMemoryJournal to block shard recovery
485 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
486 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
487 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
489 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
491 final AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName);
493 // Create the write Tx
494 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
495 assertNotNull("newReadWriteTransaction returned null", writeTx);
497 // Do some modifications and ready the Tx on a separate
499 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
500 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
501 final CountDownLatch txReady = new CountDownLatch(1);
502 final Thread txThread = new Thread(() -> {
504 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
506 txCohort.set(writeTx.ready());
507 } catch (Exception e) {
516 // Wait for the Tx operations to complete.
517 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
518 if (caughtEx.get() != null) {
519 throw caughtEx.get();
522 assertTrue("Tx ready", done);
524 // Wait for the commit to complete. Since the shard never
525 // initialized, the Tx should
526 // have timed out and throw an appropriate exception cause.
528 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
529 fail("Expected NotInitializedException");
530 } catch (final Exception e) {
531 final Throwable root = Throwables.getRootCause(e);
532 Throwables.throwIfUnchecked(root);
533 throw new RuntimeException(root);
535 blockRecoveryLatch.countDown();
539 @Test(expected = NotInitializedException.class)
540 @SuppressWarnings("checkstyle:IllegalCatch")
541 public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
542 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
543 final String testName = "testTransactionReadFailureWithShardNotInitialized";
544 final String shardName = "test-1";
546 // Set the shard initialization timeout low for the test.
547 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
549 // Setup the InMemoryJournal to block shard recovery
551 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
552 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
553 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
555 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
557 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
559 // Create the read-write Tx
560 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
561 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
563 // Do a read on the Tx on a separate thread.
564 final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>> txReadFuture = new AtomicReference<>();
565 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
566 final CountDownLatch txReadDone = new CountDownLatch(1);
567 final Thread txThread = new Thread(() -> {
569 readWriteTx.write(TestModel.TEST_PATH,
570 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
572 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
575 } catch (Exception e) {
578 txReadDone.countDown();
584 // Wait for the Tx operations to complete.
585 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
586 if (caughtEx.get() != null) {
587 throw caughtEx.get();
590 assertTrue("Tx read done", done);
592 // Wait for the read to complete. Since the shard never
593 // initialized, the Tx should
594 // have timed out and throw an appropriate exception cause.
596 txReadFuture.get().get(5, TimeUnit.SECONDS);
597 } catch (ExecutionException e) {
598 assertTrue("Expected ReadFailedException cause: " + e.getCause(),
599 e.getCause() instanceof ReadFailedException);
600 final Throwable root = Throwables.getRootCause(e);
601 Throwables.throwIfUnchecked(root);
602 throw new RuntimeException(root);
604 blockRecoveryLatch.countDown();
609 @SuppressWarnings("checkstyle:IllegalCatch")
610 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
612 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
613 final String shardName = "default";
615 // We don't want the shard to become the leader so prevent shard
617 datastoreContextBuilder.customRaftPolicyImplementation(
618 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
620 // The ShardManager uses the election timeout for FindPrimary so
621 // reset it low so it will timeout quickly.
622 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
623 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
625 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
627 final Object result = dataStore.getActorContext().executeOperation(
628 dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
629 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
631 // Create the write Tx.
632 DOMStoreWriteTransaction writeTxToClose = null;
634 writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
635 : dataStore.newReadWriteTransaction();
636 final DOMStoreWriteTransaction writeTx = writeTxToClose;
637 assertNotNull("newReadWriteTransaction returned null", writeTx);
639 // Do some modifications and ready the Tx on a separate
641 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
642 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
643 final CountDownLatch txReady = new CountDownLatch(1);
644 final Thread txThread = new Thread(() -> {
646 writeTx.write(TestModel.JUNK_PATH,
647 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
649 txCohort.set(writeTx.ready());
650 } catch (Exception e) {
659 // Wait for the Tx operations to complete.
660 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
661 if (caughtEx.get() != null) {
662 throw caughtEx.get();
665 assertTrue("Tx ready", done);
667 // Wait for the commit to complete. Since no shard
668 // leader was elected in time, the Tx
669 // should have timed out and throw an appropriate
672 txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
673 fail("Expected NoShardLeaderException");
674 } catch (final ExecutionException e) {
675 final String msg = "Unexpected exception: "
676 + Throwables.getStackTraceAsString(e.getCause());
677 if (DistributedDataStore.class.equals(testParameter)) {
678 assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
680 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
685 if (writeTxToClose != null) {
686 writeTxToClose.close();
688 } catch (Exception e) {
689 // FIXME TransactionProxy.close throws IllegalStateException:
690 // Transaction is ready, it cannot be closed
697 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
698 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
699 testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
703 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
704 testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
708 public void testTransactionAbort() throws Exception {
709 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
710 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
711 testParameter, "transactionAbortIntegrationTest", "test-1")) {
713 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
714 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
716 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
718 final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
720 cohort.canCommit().get(5, TimeUnit.SECONDS);
722 cohort.abort().get(5, TimeUnit.SECONDS);
724 testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
725 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
730 @SuppressWarnings("checkstyle:IllegalCatch")
731 public void testTransactionChainWithSingleShard() throws Exception {
732 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
733 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
734 testParameter, "testTransactionChainWithSingleShard", "test-1")) {
736 // 1. Create a Tx chain and write-only Tx
737 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
739 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
740 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
742 // 2. Write some data
743 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
744 writeTx.write(TestModel.TEST_PATH, testNode);
746 // 3. Ready the Tx for commit
747 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
749 // 4. Commit the Tx on another thread that first waits for
750 // the second read Tx.
751 final CountDownLatch continueCommit1 = new CountDownLatch(1);
752 final CountDownLatch commit1Done = new CountDownLatch(1);
753 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
756 continueCommit1.await();
757 testKit.doCommit(cohort1);
758 } catch (Exception e) {
761 commit1Done.countDown();
765 // 5. Create a new read Tx from the chain to read and verify
766 // the data from the first
767 // Tx is visible after being readied.
768 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
769 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
770 assertTrue("isPresent", optional.isPresent());
771 assertEquals("Data node", testNode, optional.get());
773 // 6. Create a new RW Tx from the chain, write more data,
775 final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
776 final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
777 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
779 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
781 final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
783 // 7. Create a new read Tx from the chain to read the data
784 // from the last RW Tx to
785 // verify it is visible.
786 readTx = txChain.newReadWriteTransaction();
787 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
788 assertTrue("isPresent", optional.isPresent());
789 assertEquals("Data node", outerNode, optional.get());
791 // 8. Wait for the 2 commits to complete and close the
793 continueCommit1.countDown();
794 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
796 if (commit1Error.get() != null) {
797 throw commit1Error.get();
800 testKit.doCommit(cohort2);
804 // 9. Create a new read Tx from the data store and verify
806 readTx = dataStore.newReadOnlyTransaction();
807 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
808 assertTrue("isPresent", optional.isPresent());
809 assertEquals("Data node", outerNode, optional.get());
814 public void testTransactionChainWithMultipleShards() throws Exception {
815 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
816 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
817 testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
819 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
821 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
822 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
824 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
825 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
827 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
828 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
830 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
832 final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
834 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
835 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
836 readWriteTx.write(carPath, car);
838 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
839 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
840 readWriteTx.merge(personPath, person);
842 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
843 assertTrue("isPresent", optional.isPresent());
844 assertEquals("Data node", car, optional.get());
846 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
847 assertTrue("isPresent", optional.isPresent());
848 assertEquals("Data node", person, optional.get());
850 final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
852 writeTx = txChain.newWriteOnlyTransaction();
854 writeTx.delete(carPath);
856 final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
858 final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
859 final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
861 testKit.doCommit(canCommit1, cohort1);
862 testKit.doCommit(canCommit2, cohort2);
863 testKit.doCommit(cohort3);
867 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
869 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
870 assertFalse("isPresent", optional.isPresent());
872 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
873 assertTrue("isPresent", optional.isPresent());
874 assertEquals("Data node", person, optional.get());
879 public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
880 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
881 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
882 testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
884 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
885 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
886 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
887 MoreExecutors.directExecutor());
889 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
890 DOMTransactionChain txChain = broker.createTransactionChain(listener);
892 final List<ListenableFuture<?>> futures = new ArrayList<>();
894 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
895 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
896 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
897 futures.add(writeTx.commit());
900 for (int i = 0; i < numCars; i++) {
901 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
903 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
904 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
906 futures.add(rwTx.commit());
909 for (final ListenableFuture<?> f : futures) {
910 f.get(5, TimeUnit.SECONDS);
913 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
914 .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
915 assertTrue("isPresent", optional.isPresent());
916 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
925 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
926 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
927 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
928 testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
930 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
932 final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
936 final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
938 final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
939 assertFalse("isPresent", optional.isPresent());
946 public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
947 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
948 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
949 testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
951 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
953 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
954 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
956 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
958 // Try to create another Tx of each type - each should fail
959 // b/c the previous Tx wasn't
961 testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
966 public void testCreateChainedTransactionAfterClose() throws Exception {
967 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
968 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
969 testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
971 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
974 // Try to create another Tx of each type - should fail b/c
975 // the previous Tx was closed.
976 testKit.assertExceptionOnTxChainCreates(txChain, DOMTransactionChainClosedException.class);
981 public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
982 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
983 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
984 testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
986 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
988 // Create a write tx and submit.
989 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
990 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
991 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
993 // Create read-only tx's and issue a read.
994 FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture1 = txChain
995 .newReadOnlyTransaction().read(TestModel.TEST_PATH);
997 FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture2 = txChain
998 .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1000 // Create another write tx and issue the write.
1001 DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1002 writeTx2.write(TestModel.OUTER_LIST_PATH,
1003 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
1004 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
1007 // Ensure the reads succeed.
1009 assertTrue("isPresent", readFuture1.get(5, TimeUnit.SECONDS).isPresent());
1010 assertTrue("isPresent", readFuture2.get(5, TimeUnit.SECONDS).isPresent());
1012 // Ensure the writes succeed.
1013 DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1015 testKit.doCommit(cohort1);
1016 testKit.doCommit(cohort2);
1018 assertTrue("isPresent", txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
1019 .get(5, TimeUnit.SECONDS).isPresent());
1024 public void testChainedTransactionFailureWithSingleShard() throws Exception {
1025 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
1026 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
1027 testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1029 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1030 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1031 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1032 MoreExecutors.directExecutor());
1034 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
1035 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1037 final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
1039 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1040 PeopleModel.emptyContainer());
1042 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1043 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1044 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1046 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1049 writeTx.commit().get(5, TimeUnit.SECONDS);
1050 fail("Expected TransactionCommitFailedException");
1051 } catch (final ExecutionException e) {
1055 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1056 any(Throwable.class));
1064 public void testChainedTransactionFailureWithMultipleShards() throws Exception {
1065 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
1066 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
1067 testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1069 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1070 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1071 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1072 MoreExecutors.directExecutor());
1074 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
1075 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1077 final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
1079 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1080 PeopleModel.emptyContainer());
1082 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1083 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1084 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1086 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1088 // Note that merge will validate the data and fail but put
1089 // succeeds b/c deep validation is not
1090 // done for put for performance reasons.
1092 writeTx.commit().get(5, TimeUnit.SECONDS);
1093 fail("Expected TransactionCommitFailedException");
1094 } catch (final ExecutionException e) {
1098 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1099 any(Throwable.class));
1107 public void testDataTreeChangeListenerRegistration() throws Exception {
1108 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
1109 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
1110 testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
1112 testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
1113 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1115 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1117 ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
1118 .registerTreeChangeListener(TestModel.TEST_PATH, listener);
1120 assertNotNull("registerTreeChangeListener returned null", listenerReg);
1122 IntegrationTestKit.verifyShardState(dataStore, "test-1",
1123 state -> assertEquals("getTreeChangeListenerActors", 1,
1124 state.getTreeChangeListenerActors().size()));
1126 // Wait for the initial notification
1127 listener.waitForChangeEvents(TestModel.TEST_PATH);
1131 testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1132 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
1133 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
1136 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1137 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1138 testKit.testWriteTransaction(dataStore, listPath,
1139 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1141 // Wait for the 2 updates.
1142 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1143 listenerReg.close();
1145 IntegrationTestKit.verifyShardState(dataStore, "test-1",
1146 state -> assertEquals("getTreeChangeListenerActors", 0,
1147 state.getTreeChangeListenerActors().size()));
1149 testKit.testWriteTransaction(dataStore,
1150 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1151 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1152 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1154 listener.expectNoMoreChanges("Received unexpected change after close");
1159 public void testRestoreFromDatastoreSnapshot() throws Exception {
1160 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
1161 final String name = "transactionIntegrationTest";
1163 final ContainerNode carsNode = CarsModel.newCarsNode(
1164 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1165 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1167 DataTree dataTree = new InMemoryDataTreeFactory().create(
1168 DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
1169 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1170 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1172 final Snapshot carsSnapshot = Snapshot.create(
1173 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1174 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1176 dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1177 SchemaContextHelper.full());
1179 final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1180 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1182 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1184 final Snapshot peopleSnapshot = Snapshot.create(
1185 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1186 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1188 testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1189 new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
1190 new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
1192 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
1193 testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
1195 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1198 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1199 assertTrue("isPresent", optional.isPresent());
1200 assertEquals("Data node", carsNode, optional.get());
1202 optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1203 assertTrue("isPresent", optional.isPresent());
1204 assertEquals("Data node", peopleNode, optional.get());