Fix unit test CS warnings in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.CheckedFuture;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.typesafe.config.ConfigFactory;
33 import java.io.IOException;
34 import java.math.BigInteger;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicReference;
44 import org.junit.AfterClass;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.mockito.Mockito;
48 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
51 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
55 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
56 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
57 import org.opendaylight.controller.cluster.raft.Snapshot;
58 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
59 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
60 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
61 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
62 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
63 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
64 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
65 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
66 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
67 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
68 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
69 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
70 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
71 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
72 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
74 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
77 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
78 import org.opendaylight.yangtools.concepts.ListenerRegistration;
79 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
80 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
81 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
82 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
83 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
84 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
85 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
86 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
87 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
88 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
89
90 public class DistributedDataStoreIntegrationTest {
91
92     private static ActorSystem system;
93
94     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
95             .shardHeartbeatIntervalInMillis(100);
96
97     @BeforeClass
98     public static void setUpClass() throws IOException {
99         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
100         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
101         Cluster.get(system).join(member1Address);
102     }
103
104     @AfterClass
105     public static void tearDownClass() throws IOException {
106         JavaTestKit.shutdownActorSystem(system);
107         system = null;
108     }
109
110     protected ActorSystem getSystem() {
111         return system;
112     }
113
114     @Test
115     public void testWriteTransactionWithSingleShard() throws Exception {
116         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
117             {
118                 try (DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
119                         "test-1")) {
120
121                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
122                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
123
124                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
125                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
126                 }
127             }
128         };
129     }
130
131     @Test
132     public void testWriteTransactionWithMultipleShards() throws Exception {
133         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
134             {
135                 try (DistributedDataStore dataStore = setupDistributedDataStore(
136                         "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
137
138                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
139                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
140
141                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
142                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
143
144                     doCommit(writeTx.ready());
145
146                     writeTx = dataStore.newWriteOnlyTransaction();
147
148                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
149                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
150
151                     doCommit(writeTx.ready());
152
153                     writeTx = dataStore.newWriteOnlyTransaction();
154
155                     MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
156                     YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
157                     writeTx.write(carPath, car);
158
159                     MapEntryNode person = PeopleModel.newPersonEntry("jack");
160                     YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
161                     writeTx.write(personPath, person);
162
163                     doCommit(writeTx.ready());
164
165                     // Verify the data in the store
166
167                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
168
169                     Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
170                     assertEquals("isPresent", true, optional.isPresent());
171                     assertEquals("Data node", car, optional.get());
172
173                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
174                     assertEquals("isPresent", true, optional.isPresent());
175                     assertEquals("Data node", person, optional.get());
176                 }
177             }
178         };
179     }
180
181     @Test
182     public void testReadWriteTransactionWithSingleShard() throws Exception {
183         System.setProperty("shard.persistent", "true");
184         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
185             {
186                 try (DistributedDataStore dataStore = setupDistributedDataStore(
187                         "testReadWriteTransactionWithSingleShard", "test-1")) {
188
189                     // 1. Create a read-write Tx
190
191                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
192                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
193
194                     // 2. Write some data
195
196                     YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
197                     NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
198                     readWriteTx.write(nodePath, nodeToWrite);
199
200                     // 3. Read the data from Tx
201
202                     Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
203                     assertEquals("exists", true, exists);
204
205                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
206                     assertEquals("isPresent", true, optional.isPresent());
207                     assertEquals("Data node", nodeToWrite, optional.get());
208
209                     // 4. Ready the Tx for commit
210
211                     DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
212
213                     // 5. Commit the Tx
214
215                     doCommit(cohort);
216
217                     // 6. Verify the data in the store
218
219                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
220
221                     optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
222                     assertEquals("isPresent", true, optional.isPresent());
223                     assertEquals("Data node", nodeToWrite, optional.get());
224                 }
225             }
226         };
227     }
228
229     @Test
230     public void testReadWriteTransactionWithMultipleShards() throws Exception {
231         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
232             {
233                 try (DistributedDataStore dataStore = setupDistributedDataStore(
234                         "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
235
236                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
237                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
238
239                     readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
240                     readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
241
242                     doCommit(readWriteTx.ready());
243
244                     readWriteTx = dataStore.newReadWriteTransaction();
245
246                     readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
247                     readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
248
249                     doCommit(readWriteTx.ready());
250
251                     readWriteTx = dataStore.newReadWriteTransaction();
252
253                     MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
254                     YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
255                     readWriteTx.write(carPath, car);
256
257                     MapEntryNode person = PeopleModel.newPersonEntry("jack");
258                     YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
259                     readWriteTx.write(personPath, person);
260
261                     Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
262                     assertEquals("exists", true, exists);
263
264                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
265                     assertEquals("isPresent", true, optional.isPresent());
266                     assertEquals("Data node", car, optional.get());
267
268                     doCommit(readWriteTx.ready());
269
270                     // Verify the data in the store
271
272                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
273
274                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
275                     assertEquals("isPresent", true, optional.isPresent());
276                     assertEquals("Data node", car, optional.get());
277
278                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
279                     assertEquals("isPresent", true, optional.isPresent());
280                     assertEquals("Data node", person, optional.get());
281
282                 }
283             }
284         };
285     }
286
287     @Test
288     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
289         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
290             {
291                 try (DistributedDataStore dataStore = setupDistributedDataStore(
292                         "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
293
294                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
295
296                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
297                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
298                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
299                     doCommit(writeTx.ready());
300
301                     writeTx = txChain.newWriteOnlyTransaction();
302
303                     int numCars = 5;
304                     for (int i = 0; i < numCars; i++) {
305                         writeTx.write(CarsModel.newCarPath("car" + i),
306                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
307                     }
308
309                     doCommit(writeTx.ready());
310
311                     Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
312                             .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
313                     assertEquals("isPresent", true, optional.isPresent());
314                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
315                 }
316             }
317         };
318     }
319
320     @SuppressWarnings("checkstyle:IllegalCatch")
321     private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
322             throws Exception {
323         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
324             {
325                 String shardName = "test-1";
326
327                 // Setup the InMemoryJournal to block shard recovery to ensure
328                 // the shard isn't
329                 // initialized until we create and submit the write the Tx.
330                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
331                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
332                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
333
334                 try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
335
336                     // Create the write Tx
337
338                     final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
339                             : dataStore.newReadWriteTransaction();
340                     assertNotNull("newReadWriteTransaction returned null", writeTx);
341
342                     // Do some modification operations and ready the Tx on a
343                     // separate thread.
344
345                     final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
346                             .builder(TestModel.OUTER_LIST_PATH)
347                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
348
349                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
350                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
351                     final CountDownLatch txReady = new CountDownLatch(1);
352                     Thread txThread = new Thread() {
353                         @Override
354                         public void run() {
355                             try {
356                                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
357
358                                 writeTx.merge(TestModel.OUTER_LIST_PATH,
359                                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
360
361                                 writeTx.write(listEntryPath,
362                                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
363
364                                 writeTx.delete(listEntryPath);
365
366                                 txCohort.set(writeTx.ready());
367                             } catch (Exception e) {
368                                 caughtEx.set(e);
369                                 return;
370                             } finally {
371                                 txReady.countDown();
372                             }
373                         }
374                     };
375
376                     txThread.start();
377
378                     // Wait for the Tx operations to complete.
379
380                     boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
381                     if (caughtEx.get() != null) {
382                         throw caughtEx.get();
383                     }
384
385                     assertEquals("Tx ready", true, done);
386
387                     // At this point the Tx operations should be waiting for the
388                     // shard to initialize so
389                     // trigger the latch to let the shard recovery to continue.
390
391                     blockRecoveryLatch.countDown();
392
393                     // Wait for the Tx commit to complete.
394
395                     doCommit(txCohort.get());
396
397                     // Verify the data in the store
398
399                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
400
401                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
402                     assertEquals("isPresent", true, optional.isPresent());
403
404                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
405                     assertEquals("isPresent", true, optional.isPresent());
406
407                     optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
408                     assertEquals("isPresent", false, optional.isPresent());
409                 }
410             }
411         };
412     }
413
414     @Test
415     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
416         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
417         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
418     }
419
420     @Test
421     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
422         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
423     }
424
425     @Test
426     @SuppressWarnings("checkstyle:IllegalCatch")
427     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
428         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
429             {
430                 String testName = "testTransactionReadsWithShardNotInitiallyReady";
431                 String shardName = "test-1";
432
433                 // Setup the InMemoryJournal to block shard recovery to ensure
434                 // the shard isn't
435                 // initialized until we create the Tx.
436                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
437                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
438                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
439
440                 try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
441
442                     // Create the read-write Tx
443
444                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
445                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
446
447                     // Do some reads on the Tx on a separate thread.
448
449                     final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
450                             new AtomicReference<>();
451                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
452                             txReadFuture = new AtomicReference<>();
453                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
454                     final CountDownLatch txReadsDone = new CountDownLatch(1);
455                     Thread txThread = new Thread() {
456                         @Override
457                         public void run() {
458                             try {
459                                 readWriteTx.write(TestModel.TEST_PATH,
460                                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
461
462                                 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
463
464                                 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
465                             } catch (Exception e) {
466                                 caughtEx.set(e);
467                                 return;
468                             } finally {
469                                 txReadsDone.countDown();
470                             }
471                         }
472                     };
473
474                     txThread.start();
475
476                     // Wait for the Tx operations to complete.
477
478                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
479                     if (caughtEx.get() != null) {
480                         throw caughtEx.get();
481                     }
482
483                     assertEquals("Tx reads done", true, done);
484
485                     // At this point the Tx operations should be waiting for the
486                     // shard to initialize so
487                     // trigger the latch to let the shard recovery to continue.
488
489                     blockRecoveryLatch.countDown();
490
491                     // Wait for the reads to complete and verify.
492
493                     assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
494                     assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
495
496                     readWriteTx.close();
497                 }
498             }
499         };
500     }
501
502     @Test(expected = NotInitializedException.class)
503     @SuppressWarnings("checkstyle:IllegalCatch")
504     public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
505         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
506             {
507                 String testName = "testTransactionCommitFailureWithShardNotInitialized";
508                 String shardName = "test-1";
509
510                 // Set the shard initialization timeout low for the test.
511
512                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
513
514                 // Setup the InMemoryJournal to block shard recovery
515                 // indefinitely.
516
517                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
518                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
519                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
520
521                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
522
523                 try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
524
525                     // Create the write Tx
526
527                     final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
528                     assertNotNull("newReadWriteTransaction returned null", writeTx);
529
530                     // Do some modifications and ready the Tx on a separate
531                     // thread.
532
533                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
534                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
535                     final CountDownLatch txReady = new CountDownLatch(1);
536                     Thread txThread = new Thread() {
537                         @Override
538                         public void run() {
539                             try {
540                                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
541
542                                 txCohort.set(writeTx.ready());
543                             } catch (Exception e) {
544                                 caughtEx.set(e);
545                                 return;
546                             } finally {
547                                 txReady.countDown();
548                             }
549                         }
550                     };
551
552                     txThread.start();
553
554                     // Wait for the Tx operations to complete.
555
556                     boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
557                     if (caughtEx.get() != null) {
558                         throw caughtEx.get();
559                     }
560
561                     assertEquals("Tx ready", true, done);
562
563                     // Wait for the commit to complete. Since the shard never
564                     // initialized, the Tx should
565                     // have timed out and throw an appropriate exception cause.
566
567                     try {
568                         txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
569                     } catch (ExecutionException e) {
570                         Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
571                         Throwables.propagate(e.getCause());
572                     } finally {
573                         blockRecoveryLatch.countDown();
574                     }
575                 }
576             }
577         };
578     }
579
580     @Test(expected = NotInitializedException.class)
581     @SuppressWarnings("checkstyle:IllegalCatch")
582     public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
583         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
584             {
585                 String testName = "testTransactionReadFailureWithShardNotInitialized";
586                 String shardName = "test-1";
587
588                 // Set the shard initialization timeout low for the test.
589
590                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
591
592                 // Setup the InMemoryJournal to block shard recovery
593                 // indefinitely.
594
595                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
596                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
597                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
598
599                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
600
601                 try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
602
603                     // Create the read-write Tx
604
605                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
606                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
607
608                     // Do a read on the Tx on a separate thread.
609
610                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
611                             txReadFuture = new AtomicReference<>();
612                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
613                     final CountDownLatch txReadDone = new CountDownLatch(1);
614                     Thread txThread = new Thread() {
615                         @Override
616                         public void run() {
617                             try {
618                                 readWriteTx.write(TestModel.TEST_PATH,
619                                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
620
621                                 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
622
623                                 readWriteTx.close();
624                             } catch (Exception e) {
625                                 caughtEx.set(e);
626                                 return;
627                             } finally {
628                                 txReadDone.countDown();
629                             }
630                         }
631                     };
632
633                     txThread.start();
634
635                     // Wait for the Tx operations to complete.
636
637                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
638                     if (caughtEx.get() != null) {
639                         throw caughtEx.get();
640                     }
641
642                     assertEquals("Tx read done", true, done);
643
644                     // Wait for the read to complete. Since the shard never
645                     // initialized, the Tx should
646                     // have timed out and throw an appropriate exception cause.
647
648                     try {
649                         txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
650                     } catch (ReadFailedException e) {
651                         Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
652                         Throwables.propagate(e.getCause());
653                     } finally {
654                         blockRecoveryLatch.countDown();
655                     }
656                 }
657             }
658         };
659     }
660
661     @SuppressWarnings("checkstyle:IllegalCatch")
662     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
663             throws Exception {
664         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
665             {
666                 String shardName = "default";
667
668                 // We don't want the shard to become the leader so prevent shard
669                 // elections.
670                 datastoreContextBuilder.customRaftPolicyImplementation(
671                         "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
672
673                 // The ShardManager uses the election timeout for FindPrimary so
674                 // reset it low so it will timeout quickly.
675                 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
676                         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
677
678                 try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
679
680                     Object result = dataStore.getActorContext().executeOperation(
681                             dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
682                     assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
683
684                     // Create the write Tx.
685
686                     try (final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
687                             : dataStore.newReadWriteTransaction()) {
688                         assertNotNull("newReadWriteTransaction returned null", writeTx);
689
690                         // Do some modifications and ready the Tx on a separate
691                         // thread.
692
693                         final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
694                         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
695                         final CountDownLatch txReady = new CountDownLatch(1);
696                         Thread txThread = new Thread() {
697                             @Override
698                             public void run() {
699                                 try {
700                                     writeTx.write(TestModel.JUNK_PATH,
701                                             ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
702
703                                     txCohort.set(writeTx.ready());
704                                 } catch (Exception e) {
705                                     caughtEx.set(e);
706                                     return;
707                                 } finally {
708                                     txReady.countDown();
709                                 }
710                             }
711                         };
712
713                         txThread.start();
714
715                         // Wait for the Tx operations to complete.
716
717                         boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
718                         if (caughtEx.get() != null) {
719                             throw caughtEx.get();
720                         }
721
722                         assertEquals("Tx ready", true, done);
723
724                         // Wait for the commit to complete. Since no shard
725                         // leader was elected in time, the Tx
726                         // should have timed out and throw an appropriate
727                         // exception cause.
728
729                         try {
730                             txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
731                         } catch (ExecutionException e) {
732                             Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
733                             Throwables.propagate(e.getCause());
734                         }
735                     }
736                 }
737             }
738         };
739     }
740
741     @Test(expected = NoShardLeaderException.class)
742     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
743         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
744         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
745     }
746
747     @Test(expected = NoShardLeaderException.class)
748     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
749         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
750     }
751
752     @Test
753     public void testTransactionAbort() throws Exception {
754         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
755             {
756                 try (DistributedDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest",
757                         "test-1")) {
758
759                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
760                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
761
762                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
763
764                     DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
765
766                     cohort.canCommit().get(5, TimeUnit.SECONDS);
767
768                     cohort.abort().get(5, TimeUnit.SECONDS);
769
770                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
771                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
772                 }
773             }
774         };
775     }
776
777     @Test
778     @SuppressWarnings("checkstyle:IllegalCatch")
779     public void testTransactionChainWithSingleShard() throws Exception {
780         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
781             {
782                 try (DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard",
783                         "test-1")) {
784
785                     // 1. Create a Tx chain and write-only Tx
786
787                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
788
789                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
790                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
791
792                     // 2. Write some data
793
794                     NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
795                     writeTx.write(TestModel.TEST_PATH, testNode);
796
797                     // 3. Ready the Tx for commit
798
799                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
800
801                     // 4. Commit the Tx on another thread that first waits for
802                     // the second read Tx.
803
804                     final CountDownLatch continueCommit1 = new CountDownLatch(1);
805                     final CountDownLatch commit1Done = new CountDownLatch(1);
806                     final AtomicReference<Exception> commit1Error = new AtomicReference<>();
807                     new Thread() {
808                         @Override
809                         public void run() {
810                             try {
811                                 continueCommit1.await();
812                                 doCommit(cohort1);
813                             } catch (Exception e) {
814                                 commit1Error.set(e);
815                             } finally {
816                                 commit1Done.countDown();
817                             }
818                         }
819                     }.start();
820
821                     // 5. Create a new read Tx from the chain to read and verify
822                     // the data from the first
823                     // Tx is visible after being readied.
824
825                     DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
826                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
827                     assertEquals("isPresent", true, optional.isPresent());
828                     assertEquals("Data node", testNode, optional.get());
829
830                     // 6. Create a new RW Tx from the chain, write more data,
831                     // and ready it
832
833                     DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
834                     MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
835                     rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
836
837                     final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
838
839                     // 7. Create a new read Tx from the chain to read the data
840                     // from the last RW Tx to
841                     // verify it is visible.
842
843                     readTx = txChain.newReadWriteTransaction();
844                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
845                     assertEquals("isPresent", true, optional.isPresent());
846                     assertEquals("Data node", outerNode, optional.get());
847
848                     // 8. Wait for the 2 commits to complete and close the
849                     // chain.
850
851                     continueCommit1.countDown();
852                     Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
853
854                     if (commit1Error.get() != null) {
855                         throw commit1Error.get();
856                     }
857
858                     doCommit(cohort2);
859
860                     txChain.close();
861
862                     // 9. Create a new read Tx from the data store and verify
863                     // committed data.
864
865                     readTx = dataStore.newReadOnlyTransaction();
866                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
867                     assertEquals("isPresent", true, optional.isPresent());
868                     assertEquals("Data node", outerNode, optional.get());
869                 }
870             }
871         };
872     }
873
874     @Test
875     public void testTransactionChainWithMultipleShards() throws Exception {
876         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
877             {
878                 try (DistributedDataStore dataStore = setupDistributedDataStore(
879                         "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
880
881                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
882
883                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
884                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
885
886                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
887                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
888
889                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
890                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
891
892                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
893
894                     DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
895
896                     MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
897                     YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
898                     readWriteTx.write(carPath, car);
899
900                     MapEntryNode person = PeopleModel.newPersonEntry("jack");
901                     YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
902                     readWriteTx.merge(personPath, person);
903
904                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
905                     assertEquals("isPresent", true, optional.isPresent());
906                     assertEquals("Data node", car, optional.get());
907
908                     optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
909                     assertEquals("isPresent", true, optional.isPresent());
910                     assertEquals("Data node", person, optional.get());
911
912                     DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
913
914                     writeTx = txChain.newWriteOnlyTransaction();
915
916                     writeTx.delete(carPath);
917
918                     DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
919
920                     ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
921                     ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
922
923                     doCommit(canCommit1, cohort1);
924                     doCommit(canCommit2, cohort2);
925                     doCommit(cohort3);
926
927                     txChain.close();
928
929                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
930
931                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
932                     assertEquals("isPresent", false, optional.isPresent());
933
934                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
935                     assertEquals("isPresent", true, optional.isPresent());
936                     assertEquals("Data node", person, optional.get());
937                 }
938             }
939         };
940     }
941
942     @Test
943     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
944         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
945             {
946                 try (DistributedDataStore dataStore = setupDistributedDataStore(
947                         "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
948
949                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
950                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
951                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
952                             MoreExecutors.directExecutor());
953
954                     TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
955                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
956
957                     List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
958
959                     DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
960                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
961                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
962                     futures.add(writeTx.submit());
963
964                     int numCars = 100;
965                     for (int i = 0; i < numCars; i++) {
966                         DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
967
968                         rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
969                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
970
971                         futures.add(rwTx.submit());
972                     }
973
974                     for (CheckedFuture<Void, TransactionCommitFailedException> f : futures) {
975                         f.checkedGet();
976                     }
977
978                     Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
979                             .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
980                     assertEquals("isPresent", true, optional.isPresent());
981                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
982
983                     txChain.close();
984
985                     broker.close();
986                 }
987             }
988         };
989     }
990
991     @Test
992     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
993         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
994             {
995                 try (DistributedDataStore dataStore = setupDistributedDataStore(
996                         "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
997
998                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
999
1000                     DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
1001
1002                     rwTx1.ready();
1003
1004                     DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
1005
1006                     Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1007                     assertEquals("isPresent", false, optional.isPresent());
1008
1009                     txChain.close();
1010                 }
1011             }
1012         };
1013     }
1014
1015     @Test
1016     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
1017         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1018             {
1019                 try (DistributedDataStore dataStore = setupDistributedDataStore(
1020                         "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
1021
1022                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1023
1024                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1025                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
1026
1027                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1028
1029                     // Try to create another Tx of each type - each should fail
1030                     // b/c the previous Tx wasn't
1031                     // readied.
1032
1033                     assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
1034                 }
1035             }
1036         };
1037     }
1038
1039     @Test
1040     public void testCreateChainedTransactionAfterClose() throws Exception {
1041         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1042             {
1043                 try (DistributedDataStore dataStore = setupDistributedDataStore(
1044                         "testCreateChainedTransactionAfterClose", "test-1")) {
1045
1046                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1047
1048                     txChain.close();
1049
1050                     // Try to create another Tx of each type - should fail b/c
1051                     // the previous Tx was closed.
1052
1053                     assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
1054                 }
1055             }
1056         };
1057     }
1058
1059     @Test
1060     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
1061         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1062             {
1063                 try (DistributedDataStore dataStore = setupDistributedDataStore(
1064                         "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
1065
1066                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1067
1068                     // Create a write tx and submit.
1069
1070                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1071                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1072                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1073
1074                     // Create read-only tx's and issue a read.
1075
1076                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 = txChain
1077                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1078
1079                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 = txChain
1080                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1081
1082                     // Create another write tx and issue the write.
1083
1084                     DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1085                     writeTx2.write(TestModel.OUTER_LIST_PATH,
1086                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1087
1088                     // Ensure the reads succeed.
1089
1090                     assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1091                     assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1092
1093                     // Ensure the writes succeed.
1094
1095                     DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1096
1097                     doCommit(cohort1);
1098                     doCommit(cohort2);
1099
1100                     assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
1101                             .checkedGet(5, TimeUnit.SECONDS).isPresent());
1102                 }
1103             }
1104         };
1105     }
1106
1107     @Test
1108     public void testChainedTransactionFailureWithSingleShard() throws Exception {
1109         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1110             {
1111                 try (DistributedDataStore dataStore = setupDistributedDataStore(
1112                         "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1113
1114                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1115                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1116                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1117                             MoreExecutors.directExecutor());
1118
1119                     TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1120                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
1121
1122                     DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1123
1124                     ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1125                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1126                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1127
1128                     rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1129
1130                     try {
1131                         rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1132                         fail("Expected TransactionCommitFailedException");
1133                     } catch (TransactionCommitFailedException e) {
1134                         // Expected
1135                     }
1136
1137                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx),
1138                             any(Throwable.class));
1139
1140                     txChain.close();
1141                     broker.close();
1142                 }
1143             }
1144         };
1145     }
1146
1147     @Test
1148     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
1149         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1150             {
1151                 try (DistributedDataStore dataStore = setupDistributedDataStore(
1152                         "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1153
1154                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1155                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1156                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1157                             MoreExecutors.directExecutor());
1158
1159                     TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1160                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
1161
1162                     DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1163
1164                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1165                             PeopleModel.emptyContainer());
1166
1167                     ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1168                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1169                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1170
1171                     // Note that merge will validate the data and fail but put
1172                     // succeeds b/c deep validation is not
1173                     // done for put for performance reasons.
1174                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1175
1176                     try {
1177                         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1178                         fail("Expected TransactionCommitFailedException");
1179                     } catch (TransactionCommitFailedException e) {
1180                         // Expected
1181                     }
1182
1183                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1184                             any(Throwable.class));
1185
1186                     txChain.close();
1187                     broker.close();
1188                 }
1189             }
1190         };
1191     }
1192
1193     @Test
1194     public void testChangeListenerRegistration() throws Exception {
1195         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1196             {
1197                 try (DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration",
1198                         "test-1")) {
1199
1200                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
1201                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1202
1203                     MockDataChangeListener listener = new MockDataChangeListener(1);
1204
1205                     ListenerRegistration<MockDataChangeListener> listenerReg = dataStore
1206                             .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE);
1207
1208                     assertNotNull("registerChangeListener returned null", listenerReg);
1209
1210                     // Wait for the initial notification
1211
1212                     listener.waitForChangeEvents(TestModel.TEST_PATH);
1213
1214                     listener.reset(2);
1215
1216                     // Write 2 updates.
1217
1218                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1219                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1220
1221                     YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1222                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1223                     testWriteTransaction(dataStore, listPath,
1224                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1225
1226                     // Wait for the 2 updates.
1227
1228                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1229
1230                     listenerReg.close();
1231
1232                     testWriteTransaction(dataStore,
1233                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1234                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1235                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1236
1237                     listener.expectNoMoreChanges("Received unexpected change after close");
1238                 }
1239             }
1240         };
1241     }
1242
1243     @Test
1244     public void testRestoreFromDatastoreSnapshot() throws Exception {
1245         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1246             {
1247                 final String name = "transactionIntegrationTest";
1248
1249                 ContainerNode carsNode = CarsModel.newCarsNode(
1250                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1251                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1252
1253                 DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1254                 dataTree.setSchemaContext(SchemaContextHelper.full());
1255                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1256                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1257
1258                 final Snapshot carsSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
1259                         Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1260
1261                 NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1262                 dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1263                 dataTree.setSchemaContext(SchemaContextHelper.full());
1264                 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1265                 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1266
1267                 Snapshot peopleSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
1268                         Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1269
1270                 restoreFromSnapshot = new DatastoreSnapshot(name, null,
1271                         Arrays.asList(
1272                                 new DatastoreSnapshot.ShardSnapshot("cars",
1273                                         org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
1274                                 new DatastoreSnapshot.ShardSnapshot("people",
1275                                         org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
1276
1277                 try (DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1278                         true, "cars", "people")) {
1279
1280                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1281
1282                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1283                     assertEquals("isPresent", true, optional.isPresent());
1284                     assertEquals("Data node", carsNode, optional.get());
1285
1286                     optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1287                     assertEquals("isPresent", true, optional.isPresent());
1288                     assertEquals("Data node", peopleNode, optional.get());
1289                 }
1290             }
1291         };
1292     }
1293 }