3d59ebbb01c0cd822521c1da71aa29ea5bd6bb9e
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.javadsl.TestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.CheckedFuture;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.typesafe.config.ConfigFactory;
33 import java.io.IOException;
34 import java.math.BigInteger;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicReference;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.junit.runners.Parameterized;
49 import org.junit.runners.Parameterized.Parameter;
50 import org.junit.runners.Parameterized.Parameters;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
53 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
54 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
58 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
59 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
60 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
61 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
62 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
63 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
66 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
67 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
68 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
69 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.common.api.ReadFailedException;
72 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
73 import org.opendaylight.mdsal.common.api.TransactionChainListener;
74 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
75 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
76 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
77 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
78 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
79 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
80 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
81 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
82 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
83 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
84 import org.opendaylight.yangtools.concepts.ListenerRegistration;
85 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
86 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
87 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
88 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
89 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
90 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
91 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
92 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
93 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
94 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
95
96 @RunWith(Parameterized.class)
97 public class DistributedDataStoreIntegrationTest {
98
99     @Parameters(name = "{0}")
100     public static Collection<Object[]> data() {
101         return Arrays.asList(new Object[][] {
102                 { DistributedDataStore.class }, { ClientBackedDataStore.class }
103         });
104     }
105
106     @Parameter
107     public Class<? extends AbstractDataStore> testParameter;
108
109     private ActorSystem system;
110
111     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
112             .shardHeartbeatIntervalInMillis(100);
113
114     @Before
115     public void setUp() throws IOException {
116         InMemorySnapshotStore.clear();
117         InMemoryJournal.clear();
118         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
119         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
120         Cluster.get(system).join(member1Address);
121     }
122
123     @After
124     public void tearDown() throws IOException {
125         TestKit.shutdownActorSystem(system, Boolean.TRUE);
126         system = null;
127     }
128
129     protected ActorSystem getSystem() {
130         return system;
131     }
132
133     @Test
134     public void testWriteTransactionWithSingleShard() throws Exception {
135         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
136             {
137                 try (AbstractDataStore dataStore = setupAbstractDataStore(
138                         testParameter, "transactionIntegrationTest", "test-1")) {
139
140                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
141                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
142
143                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
144                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
145                 }
146             }
147         };
148     }
149
150     @Test
151     public void testWriteTransactionWithMultipleShards() throws Exception {
152         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
153             {
154                 try (AbstractDataStore dataStore = setupAbstractDataStore(
155                         testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
156
157                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
158                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
159
160                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
161                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
162
163                     doCommit(writeTx.ready());
164
165                     writeTx = dataStore.newWriteOnlyTransaction();
166
167                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
168                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
169
170                     doCommit(writeTx.ready());
171
172                     writeTx = dataStore.newWriteOnlyTransaction();
173
174                     final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
175                     final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
176                     writeTx.write(carPath, car);
177
178                     final MapEntryNode person = PeopleModel.newPersonEntry("jack");
179                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
180                     writeTx.write(personPath, person);
181
182                     doCommit(writeTx.ready());
183
184                     // Verify the data in the store
185                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
186
187                     Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
188                     assertEquals("isPresent", true, optional.isPresent());
189                     assertEquals("Data node", car, optional.get());
190
191                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
192                     assertEquals("isPresent", true, optional.isPresent());
193                     assertEquals("Data node", person, optional.get());
194                 }
195             }
196         };
197     }
198
199     @Test
200     public void testReadWriteTransactionWithSingleShard() throws Exception {
201         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
202             {
203                 try (AbstractDataStore dataStore = setupAbstractDataStore(
204                         testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
205
206                     // 1. Create a read-write Tx
207                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
208                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
209
210                     // 2. Write some data
211                     final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
212                     final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
213                     readWriteTx.write(nodePath, nodeToWrite);
214
215                     // 3. Read the data from Tx
216                     final Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
217                     assertEquals("exists", true, exists);
218
219                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
220                     assertEquals("isPresent", true, optional.isPresent());
221                     assertEquals("Data node", nodeToWrite, optional.get());
222
223                     // 4. Ready the Tx for commit
224                     final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
225
226                     // 5. Commit the Tx
227                     doCommit(cohort);
228
229                     // 6. Verify the data in the store
230                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
231
232                     optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
233                     assertEquals("isPresent", true, optional.isPresent());
234                     assertEquals("Data node", nodeToWrite, optional.get());
235                 }
236             }
237         };
238     }
239
240     @Test
241     public void testReadWriteTransactionWithMultipleShards() throws Exception {
242         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
243             {
244                 try (AbstractDataStore dataStore = setupAbstractDataStore(
245                         testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
246
247                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
248                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
249
250                     readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
251                     readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
252
253                     doCommit(readWriteTx.ready());
254
255                     readWriteTx = dataStore.newReadWriteTransaction();
256
257                     readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
258                     readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
259
260                     doCommit(readWriteTx.ready());
261
262                     readWriteTx = dataStore.newReadWriteTransaction();
263
264                     final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
265                     final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
266                     readWriteTx.write(carPath, car);
267
268                     final MapEntryNode person = PeopleModel.newPersonEntry("jack");
269                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
270                     readWriteTx.write(personPath, person);
271
272                     final Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
273                     assertEquals("exists", true, exists);
274
275                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
276                     assertEquals("isPresent", true, optional.isPresent());
277                     assertEquals("Data node", car, optional.get());
278
279                     doCommit(readWriteTx.ready());
280
281                     // Verify the data in the store
282                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
283
284                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
285                     assertEquals("isPresent", true, optional.isPresent());
286                     assertEquals("Data node", car, optional.get());
287
288                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
289                     assertEquals("isPresent", true, optional.isPresent());
290                     assertEquals("Data node", person, optional.get());
291
292                 }
293             }
294         };
295     }
296
297     @Test
298     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
299         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
300             {
301                 try (AbstractDataStore dataStore = setupAbstractDataStore(
302                         testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
303
304                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
305
306                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
307                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
308                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
309                     doCommit(writeTx.ready());
310
311                     writeTx = txChain.newWriteOnlyTransaction();
312
313                     int numCars = 5;
314                     for (int i = 0; i < numCars; i++) {
315                         writeTx.write(CarsModel.newCarPath("car" + i),
316                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
317                     }
318
319                     doCommit(writeTx.ready());
320
321                     final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
322                             .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
323                     assertEquals("isPresent", true, optional.isPresent());
324                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
325                 }
326             }
327         };
328     }
329
330     @SuppressWarnings("checkstyle:IllegalCatch")
331     private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
332             throws Exception {
333         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
334             {
335                 final String shardName = "test-1";
336
337                 // Setup the InMemoryJournal to block shard recovery to ensure
338                 // the shard isn't
339                 // initialized until we create and submit the write the Tx.
340                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
341                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
342                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
343
344                 try (AbstractDataStore dataStore = setupAbstractDataStore(
345                         testParameter, testName, false, shardName)) {
346
347                     // Create the write Tx
348                     final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
349                             : dataStore.newReadWriteTransaction();
350                     assertNotNull("newReadWriteTransaction returned null", writeTx);
351
352                     // Do some modification operations and ready the Tx on a
353                     // separate thread.
354                     final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
355                             .builder(TestModel.OUTER_LIST_PATH)
356                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
357
358                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
359                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
360                     final CountDownLatch txReady = new CountDownLatch(1);
361                     final Thread txThread = new Thread(() -> {
362                         try {
363                             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
364
365                             writeTx.merge(TestModel.OUTER_LIST_PATH,
366                                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
367
368                             writeTx.write(listEntryPath,
369                                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
370
371                             writeTx.delete(listEntryPath);
372
373                             txCohort.set(writeTx.ready());
374                         } catch (Exception e) {
375                             caughtEx.set(e);
376                         } finally {
377                             txReady.countDown();
378                         }
379                     });
380
381                     txThread.start();
382
383                     // Wait for the Tx operations to complete.
384                     final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
385                     if (caughtEx.get() != null) {
386                         throw caughtEx.get();
387                     }
388
389                     assertEquals("Tx ready", true, done);
390
391                     // At this point the Tx operations should be waiting for the
392                     // shard to initialize so
393                     // trigger the latch to let the shard recovery to continue.
394                     blockRecoveryLatch.countDown();
395
396                     // Wait for the Tx commit to complete.
397                     doCommit(txCohort.get());
398
399                     // Verify the data in the store
400                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
401
402                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
403                     assertEquals("isPresent", true, optional.isPresent());
404
405                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
406                     assertEquals("isPresent", true, optional.isPresent());
407
408                     optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
409                     assertEquals("isPresent", false, optional.isPresent());
410                 }
411             }
412         };
413     }
414
415     @Test
416     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
417         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
418         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
419     }
420
421     @Test
422     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
423         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
424     }
425
426     @Test
427     @SuppressWarnings("checkstyle:IllegalCatch")
428     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
429         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
430             {
431                 final String testName = "testTransactionReadsWithShardNotInitiallyReady";
432                 final String shardName = "test-1";
433
434                 // Setup the InMemoryJournal to block shard recovery to ensure
435                 // the shard isn't
436                 // initialized until we create the Tx.
437                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
438                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
439                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
440
441                 try (AbstractDataStore dataStore = setupAbstractDataStore(
442                         testParameter, testName, false, shardName)) {
443
444                     // Create the read-write Tx
445                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
446                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
447
448                     // Do some reads on the Tx on a separate thread.
449                     final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
450                             new AtomicReference<>();
451                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
452                             txReadFuture = new AtomicReference<>();
453                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
454                     final CountDownLatch txReadsDone = new CountDownLatch(1);
455                     final Thread txThread = new Thread(() -> {
456                         try {
457                             readWriteTx.write(TestModel.TEST_PATH,
458                                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
459
460                             txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
461
462                             txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
463                         } catch (Exception e) {
464                             caughtEx.set(e);
465                         } finally {
466                             txReadsDone.countDown();
467                         }
468                     });
469
470                     txThread.start();
471
472                     // Wait for the Tx operations to complete.
473                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
474                     if (caughtEx.get() != null) {
475                         throw caughtEx.get();
476                     }
477
478                     assertEquals("Tx reads done", true, done);
479
480                     // At this point the Tx operations should be waiting for the
481                     // shard to initialize so
482                     // trigger the latch to let the shard recovery to continue.
483                     blockRecoveryLatch.countDown();
484
485                     // Wait for the reads to complete and verify.
486                     assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
487                     assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
488
489                     readWriteTx.close();
490                 }
491             }
492         };
493     }
494
495     @Test(expected = NotInitializedException.class)
496     @SuppressWarnings("checkstyle:IllegalCatch")
497     public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
498         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
499             {
500                 final String testName = "testTransactionCommitFailureWithShardNotInitialized";
501                 final String shardName = "test-1";
502
503                 // Set the shard initialization timeout low for the test.
504                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
505
506                 // Setup the InMemoryJournal to block shard recovery
507                 // indefinitely.
508                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
509                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
510                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
511
512                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
513
514                 final AbstractDataStore dataStore =
515                         setupAbstractDataStore(testParameter, testName, false, shardName);
516
517                 // Create the write Tx
518                 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
519                 assertNotNull("newReadWriteTransaction returned null", writeTx);
520
521                 // Do some modifications and ready the Tx on a separate
522                 // thread.
523                 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
524                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
525                 final CountDownLatch txReady = new CountDownLatch(1);
526                 final Thread txThread = new Thread(() -> {
527                     try {
528                         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
529
530                         txCohort.set(writeTx.ready());
531                     } catch (Exception e) {
532                         caughtEx.set(e);
533                     } finally {
534                         txReady.countDown();
535                     }
536                 });
537
538                 txThread.start();
539
540                 // Wait for the Tx operations to complete.
541                 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
542                 if (caughtEx.get() != null) {
543                     throw caughtEx.get();
544                 }
545
546                 assertEquals("Tx ready", true, done);
547
548                 // Wait for the commit to complete. Since the shard never
549                 // initialized, the Tx should
550                 // have timed out and throw an appropriate exception cause.
551                 try {
552                     txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
553                     fail("Expected NotInitializedException");
554                 } catch (final Exception e) {
555                     final Throwable root = Throwables.getRootCause(e);
556                     Throwables.throwIfUnchecked(root);
557                     throw new RuntimeException(root);
558                 } finally {
559                     blockRecoveryLatch.countDown();
560                 }
561             }
562         };
563     }
564
565     @Test(expected = NotInitializedException.class)
566     @SuppressWarnings("checkstyle:IllegalCatch")
567     public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
568         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
569             {
570                 final String testName = "testTransactionReadFailureWithShardNotInitialized";
571                 final String shardName = "test-1";
572
573                 // Set the shard initialization timeout low for the test.
574                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
575
576                 // Setup the InMemoryJournal to block shard recovery
577                 // indefinitely.
578                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
579                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
580                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
581
582                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
583
584                 try (AbstractDataStore dataStore = setupAbstractDataStore(
585                         testParameter, testName, false, shardName)) {
586
587                     // Create the read-write Tx
588                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
589                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
590
591                     // Do a read on the Tx on a separate thread.
592                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
593                             txReadFuture = new AtomicReference<>();
594                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
595                     final CountDownLatch txReadDone = new CountDownLatch(1);
596                     final Thread txThread = new Thread(() -> {
597                         try {
598                             readWriteTx.write(TestModel.TEST_PATH,
599                                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
600
601                             txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
602
603                             readWriteTx.close();
604                         } catch (Exception e) {
605                             caughtEx.set(e);
606                         } finally {
607                             txReadDone.countDown();
608                         }
609                     });
610
611                     txThread.start();
612
613                     // Wait for the Tx operations to complete.
614                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
615                     if (caughtEx.get() != null) {
616                         throw caughtEx.get();
617                     }
618
619                     assertEquals("Tx read done", true, done);
620
621                     // Wait for the read to complete. Since the shard never
622                     // initialized, the Tx should
623                     // have timed out and throw an appropriate exception cause.
624                     try {
625                         txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
626                         fail("Expected NotInitializedException");
627                     } catch (final ReadFailedException e) {
628                         final Throwable root = Throwables.getRootCause(e);
629                         Throwables.throwIfUnchecked(root);
630                         throw new RuntimeException(root);
631                     } finally {
632                         blockRecoveryLatch.countDown();
633                     }
634                 }
635             }
636         };
637     }
638
639     @SuppressWarnings("checkstyle:IllegalCatch")
640     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
641             throws Exception {
642         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
643             {
644                 final String shardName = "default";
645
646                 // We don't want the shard to become the leader so prevent shard
647                 // elections.
648                 datastoreContextBuilder.customRaftPolicyImplementation(
649                         "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
650
651                 // The ShardManager uses the election timeout for FindPrimary so
652                 // reset it low so it will timeout quickly.
653                 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
654                         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
655
656                 try (AbstractDataStore dataStore = setupAbstractDataStore(
657                         testParameter, testName, false, shardName)) {
658
659                     final Object result = dataStore.getActorContext().executeOperation(
660                             dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
661                     assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
662
663                     // Create the write Tx.
664                     DOMStoreWriteTransaction writeTxToClose = null;
665                     try {
666                         writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
667                                 : dataStore.newReadWriteTransaction();
668                         final DOMStoreWriteTransaction writeTx = writeTxToClose;
669                         assertNotNull("newReadWriteTransaction returned null", writeTx);
670
671                         // Do some modifications and ready the Tx on a separate
672                         // thread.
673                         final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
674                         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
675                         final CountDownLatch txReady = new CountDownLatch(1);
676                         final Thread txThread = new Thread(() -> {
677                             try {
678                                 writeTx.write(TestModel.JUNK_PATH,
679                                         ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
680
681                                 txCohort.set(writeTx.ready());
682                             } catch (Exception e) {
683                                 caughtEx.set(e);
684                             } finally {
685                                 txReady.countDown();
686                             }
687                         });
688
689                         txThread.start();
690
691                         // Wait for the Tx operations to complete.
692                         boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
693                         if (caughtEx.get() != null) {
694                             throw caughtEx.get();
695                         }
696
697                         assertEquals("Tx ready", true, done);
698
699                         // Wait for the commit to complete. Since no shard
700                         // leader was elected in time, the Tx
701                         // should have timed out and throw an appropriate
702                         // exception cause.
703                         try {
704                             txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
705                             fail("Expected NoShardLeaderException");
706                         } catch (final ExecutionException e) {
707                             final String msg = "Unexpected exception: "
708                                     + Throwables.getStackTraceAsString(e.getCause());
709                             if (DistributedDataStore.class.equals(testParameter)) {
710                                 assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
711                             } else {
712                                 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
713                             }
714                         }
715                     } finally {
716                         try {
717                             if (writeTxToClose != null) {
718                                 writeTxToClose.close();
719                             }
720                         } catch (Exception e) {
721                             // FIXME TransactionProxy.close throws IllegalStateException:
722                             // Transaction is ready, it cannot be closed
723                         }
724                     }
725                 }
726             }
727         };
728     }
729
730     @Test
731     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
732         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
733         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
734     }
735
736     @Test
737     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
738         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
739     }
740
741     @Test
742     public void testTransactionAbort() throws Exception {
743         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
744             {
745                 try (AbstractDataStore dataStore = setupAbstractDataStore(
746                         testParameter, "transactionAbortIntegrationTest", "test-1")) {
747
748                     final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
749                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
750
751                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
752
753                     final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
754
755                     cohort.canCommit().get(5, TimeUnit.SECONDS);
756
757                     cohort.abort().get(5, TimeUnit.SECONDS);
758
759                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
760                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
761                 }
762             }
763         };
764     }
765
766     @Test
767     @SuppressWarnings("checkstyle:IllegalCatch")
768     public void testTransactionChainWithSingleShard() throws Exception {
769         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
770             {
771                 try (AbstractDataStore dataStore = setupAbstractDataStore(
772                         testParameter, "testTransactionChainWithSingleShard", "test-1")) {
773
774                     // 1. Create a Tx chain and write-only Tx
775                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
776
777                     final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
778                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
779
780                     // 2. Write some data
781                     final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
782                     writeTx.write(TestModel.TEST_PATH, testNode);
783
784                     // 3. Ready the Tx for commit
785                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
786
787                     // 4. Commit the Tx on another thread that first waits for
788                     // the second read Tx.
789                     final CountDownLatch continueCommit1 = new CountDownLatch(1);
790                     final CountDownLatch commit1Done = new CountDownLatch(1);
791                     final AtomicReference<Exception> commit1Error = new AtomicReference<>();
792                     new Thread(() -> {
793                         try {
794                             continueCommit1.await();
795                             doCommit(cohort1);
796                         } catch (Exception e) {
797                             commit1Error.set(e);
798                         } finally {
799                             commit1Done.countDown();
800                         }
801                     }).start();
802
803                     // 5. Create a new read Tx from the chain to read and verify
804                     // the data from the first
805                     // Tx is visible after being readied.
806                     DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
807                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
808                     assertEquals("isPresent", true, optional.isPresent());
809                     assertEquals("Data node", testNode, optional.get());
810
811                     // 6. Create a new RW Tx from the chain, write more data,
812                     // and ready it
813                     final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
814                     final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
815                     rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
816
817                     final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
818
819                     // 7. Create a new read Tx from the chain to read the data
820                     // from the last RW Tx to
821                     // verify it is visible.
822                     readTx = txChain.newReadWriteTransaction();
823                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
824                     assertEquals("isPresent", true, optional.isPresent());
825                     assertEquals("Data node", outerNode, optional.get());
826
827                     // 8. Wait for the 2 commits to complete and close the
828                     // chain.
829                     continueCommit1.countDown();
830                     Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
831
832                     if (commit1Error.get() != null) {
833                         throw commit1Error.get();
834                     }
835
836                     doCommit(cohort2);
837
838                     txChain.close();
839
840                     // 9. Create a new read Tx from the data store and verify
841                     // committed data.
842                     readTx = dataStore.newReadOnlyTransaction();
843                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
844                     assertEquals("isPresent", true, optional.isPresent());
845                     assertEquals("Data node", outerNode, optional.get());
846                 }
847             }
848         };
849     }
850
851     @Test
852     public void testTransactionChainWithMultipleShards() throws Exception {
853         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
854             {
855                 try (AbstractDataStore dataStore = setupAbstractDataStore(
856                         testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
857
858                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
859
860                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
861                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
862
863                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
864                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
865
866                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
867                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
868
869                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
870
871                     final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
872
873                     final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
874                     final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
875                     readWriteTx.write(carPath, car);
876
877                     final MapEntryNode person = PeopleModel.newPersonEntry("jack");
878                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
879                     readWriteTx.merge(personPath, person);
880
881                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
882                     assertEquals("isPresent", true, optional.isPresent());
883                     assertEquals("Data node", car, optional.get());
884
885                     optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
886                     assertEquals("isPresent", true, optional.isPresent());
887                     assertEquals("Data node", person, optional.get());
888
889                     final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
890
891                     writeTx = txChain.newWriteOnlyTransaction();
892
893                     writeTx.delete(carPath);
894
895                     final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
896
897                     final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
898                     final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
899
900                     doCommit(canCommit1, cohort1);
901                     doCommit(canCommit2, cohort2);
902                     doCommit(cohort3);
903
904                     txChain.close();
905
906                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
907
908                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
909                     assertEquals("isPresent", false, optional.isPresent());
910
911                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
912                     assertEquals("isPresent", true, optional.isPresent());
913                     assertEquals("Data node", person, optional.get());
914                 }
915             }
916         };
917     }
918
919     @Test
920     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
921         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
922             {
923                 try (AbstractDataStore dataStore = setupAbstractDataStore(
924                         testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
925
926                     final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
927                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
928                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
929                             MoreExecutors.directExecutor());
930
931                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
932                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
933
934                     final List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
935
936                     final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
937                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
938                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
939                     futures.add(writeTx.submit());
940
941                     int numCars = 100;
942                     for (int i = 0; i < numCars; i++) {
943                         final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
944
945                         rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
946                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
947
948                         futures.add(rwTx.submit());
949                     }
950
951                     for (final CheckedFuture<Void, TransactionCommitFailedException> f : futures) {
952                         f.checkedGet();
953                     }
954
955                     final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
956                             .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
957                     assertEquals("isPresent", true, optional.isPresent());
958                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
959
960                     txChain.close();
961
962                     broker.close();
963                 }
964             }
965         };
966     }
967
968     @Test
969     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
970         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
971             {
972                 try (AbstractDataStore dataStore = setupAbstractDataStore(
973                         testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
974
975                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
976
977                     final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
978
979                     rwTx1.ready();
980
981                     final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
982
983                     final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(
984                             5, TimeUnit.SECONDS);
985                     assertEquals("isPresent", false, optional.isPresent());
986
987                     txChain.close();
988                 }
989             }
990         };
991     }
992
993     @Test
994     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
995         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
996             {
997                 try (AbstractDataStore dataStore = setupAbstractDataStore(
998                         testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
999
1000                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1001
1002                     final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1003                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
1004
1005                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1006
1007                     // Try to create another Tx of each type - each should fail
1008                     // b/c the previous Tx wasn't
1009                     // readied.
1010                     assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
1011                 }
1012             }
1013         };
1014     }
1015
1016     @Test
1017     public void testCreateChainedTransactionAfterClose() throws Exception {
1018         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1019             {
1020                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1021                         testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
1022
1023                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1024                     txChain.close();
1025
1026                     // Try to create another Tx of each type - should fail b/c
1027                     // the previous Tx was closed.
1028                     assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
1029                 }
1030             }
1031         };
1032     }
1033
1034     @Test
1035     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
1036         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1037             {
1038                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1039                         testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
1040
1041                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1042
1043                     // Create a write tx and submit.
1044                     final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1045                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1046                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1047
1048                     // Create read-only tx's and issue a read.
1049                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 = txChain
1050                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1051
1052                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 = txChain
1053                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1054
1055                     // Create another write tx and issue the write.
1056                     DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1057                     writeTx2.write(TestModel.OUTER_LIST_PATH,
1058                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1059
1060                     // Ensure the reads succeed.
1061
1062                     assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1063                     assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1064
1065                     // Ensure the writes succeed.
1066                     DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1067
1068                     doCommit(cohort1);
1069                     doCommit(cohort2);
1070
1071                     assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
1072                             .checkedGet(5, TimeUnit.SECONDS).isPresent());
1073                 }
1074             }
1075         };
1076     }
1077
1078     @Test
1079     public void testChainedTransactionFailureWithSingleShard() throws Exception {
1080         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1081             {
1082                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1083                         testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1084
1085                     final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1086                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1087                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1088                             MoreExecutors.directExecutor());
1089
1090                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1091                     final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1092
1093                     final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
1094
1095                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1096                             PeopleModel.emptyContainer());
1097
1098                     final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1099                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1100                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1101
1102                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1103
1104                     try {
1105                         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1106                         fail("Expected TransactionCommitFailedException");
1107                     } catch (final TransactionCommitFailedException e) {
1108                         // Expected
1109                     }
1110
1111                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1112                             any(Throwable.class));
1113
1114                     txChain.close();
1115                     broker.close();
1116                 }
1117             }
1118         };
1119     }
1120
1121     @Test
1122     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
1123         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1124             {
1125                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1126                         testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1127
1128                     final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1129                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1130                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1131                             MoreExecutors.directExecutor());
1132
1133                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1134                     final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1135
1136                     final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
1137
1138                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1139                             PeopleModel.emptyContainer());
1140
1141                     final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1142                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1143                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1144
1145                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1146
1147                     // Note that merge will validate the data and fail but put
1148                     // succeeds b/c deep validation is not
1149                     // done for put for performance reasons.
1150                     try {
1151                         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1152                         fail("Expected TransactionCommitFailedException");
1153                     } catch (final TransactionCommitFailedException e) {
1154                         // Expected
1155                     }
1156
1157                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1158                             any(Throwable.class));
1159
1160                     txChain.close();
1161                     broker.close();
1162                 }
1163             }
1164         };
1165     }
1166
1167     @Test
1168     public void testDataTreeChangeListenerRegistration() throws Exception {
1169         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1170             {
1171                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1172                         testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
1173
1174                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
1175                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1176
1177                     final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1178
1179                     ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
1180                             .registerTreeChangeListener(TestModel.TEST_PATH, listener);
1181
1182                     assertNotNull("registerTreeChangeListener returned null", listenerReg);
1183
1184                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1185                         state -> assertEquals("getTreeChangeListenerActors", 1,
1186                                 state.getTreeChangeListenerActors().size()));
1187
1188                     // Wait for the initial notification
1189                     listener.waitForChangeEvents(TestModel.TEST_PATH);
1190                     listener.reset(2);
1191
1192                     // Write 2 updates.
1193                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1194                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1195
1196                     YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1197                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1198                     testWriteTransaction(dataStore, listPath,
1199                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1200
1201                     // Wait for the 2 updates.
1202                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1203                     listenerReg.close();
1204
1205                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1206                         state -> assertEquals("getTreeChangeListenerActors", 0,
1207                                 state.getTreeChangeListenerActors().size()));
1208
1209                     testWriteTransaction(dataStore,
1210                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1211                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1212                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1213
1214                     listener.expectNoMoreChanges("Received unexpected change after close");
1215                 }
1216             }
1217         };
1218     }
1219
1220     @Test
1221     public void testRestoreFromDatastoreSnapshot() throws Exception {
1222         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1223             {
1224                 final String name = "transactionIntegrationTest";
1225
1226                 final ContainerNode carsNode = CarsModel.newCarsNode(
1227                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1228                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1229
1230                 DataTree dataTree = new InMemoryDataTreeFactory().create(
1231                     DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
1232                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1233                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1234
1235                 final Snapshot carsSnapshot = Snapshot.create(
1236                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1237                         Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1238
1239                 dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1240                     SchemaContextHelper.full());
1241
1242                 final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1243                 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1244
1245                 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1246
1247                 final Snapshot peopleSnapshot = Snapshot.create(
1248                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1249                         Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1250
1251                 restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1252                         new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
1253                         new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
1254
1255                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1256                         testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
1257
1258                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1259
1260                     // two reads
1261                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1262                     assertEquals("isPresent", true, optional.isPresent());
1263                     assertEquals("Data node", carsNode, optional.get());
1264
1265                     optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1266                     assertEquals("isPresent", true, optional.isPresent());
1267                     assertEquals("Data node", peopleNode, optional.get());
1268                 }
1269             }
1270         };
1271     }
1272 }