CDS: Make AbstractDataBroker non-public
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.io.IOException;
32 import java.math.BigInteger;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
52 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
53 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.Snapshot;
56 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
57 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
61 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
62 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
63 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
64 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
65 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
66 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
67 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
68 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
69 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
72 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
74 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
76 import org.opendaylight.yangtools.concepts.ListenerRegistration;
77 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
78 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
80 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
81 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
82 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
83 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
84 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
85
86 public class DistributedDataStoreIntegrationTest {
87
88     private static ActorSystem system;
89
90     private final DatastoreContext.Builder datastoreContextBuilder =
91             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
92
93     @BeforeClass
94     public static void setUpClass() throws IOException {
95         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
96         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
97         Cluster.get(system).join(member1Address);
98     }
99
100     @AfterClass
101     public static void tearDownClass() throws IOException {
102         JavaTestKit.shutdownActorSystem(system);
103         system = null;
104     }
105
106     protected ActorSystem getSystem() {
107         return system;
108     }
109
110     @Test
111     public void testWriteTransactionWithSingleShard() throws Exception{
112         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
113             DistributedDataStore dataStore =
114                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
115
116             testWriteTransaction(dataStore, TestModel.TEST_PATH,
117                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
118
119             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
120                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
121
122             cleanup(dataStore);
123         }};
124     }
125
126     @Test
127     public void testWriteTransactionWithMultipleShards() throws Exception{
128         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
129             DistributedDataStore dataStore =
130                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
131
132             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
133             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
134
135             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
136             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
137
138             doCommit(writeTx.ready());
139
140             writeTx = dataStore.newWriteOnlyTransaction();
141
142             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
143             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
144
145             doCommit(writeTx.ready());
146
147             writeTx = dataStore.newWriteOnlyTransaction();
148
149             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
150             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
151             writeTx.write(carPath, car);
152
153             MapEntryNode person = PeopleModel.newPersonEntry("jack");
154             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
155             writeTx.write(personPath, person);
156
157             doCommit(writeTx.ready());
158
159             // Verify the data in the store
160
161             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
162
163             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
164             assertEquals("isPresent", true, optional.isPresent());
165             assertEquals("Data node", car, optional.get());
166
167             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
168             assertEquals("isPresent", true, optional.isPresent());
169             assertEquals("Data node", person, optional.get());
170
171             cleanup(dataStore);
172         }};
173     }
174
175     @Test
176     public void testReadWriteTransactionWithSingleShard() throws Exception{
177         System.setProperty("shard.persistent", "true");
178         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
179             DistributedDataStore dataStore =
180                     setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
181
182             // 1. Create a read-write Tx
183
184             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
185             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
186
187             // 2. Write some data
188
189             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
190             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
191             readWriteTx.write(nodePath, nodeToWrite );
192
193             // 3. Read the data from Tx
194
195             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
196             assertEquals("exists", true, exists);
197
198             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
199             assertEquals("isPresent", true, optional.isPresent());
200             assertEquals("Data node", nodeToWrite, optional.get());
201
202             // 4. Ready the Tx for commit
203
204             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
205
206             // 5. Commit the Tx
207
208             doCommit(cohort);
209
210             // 6. Verify the data in the store
211
212             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
213
214             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
215             assertEquals("isPresent", true, optional.isPresent());
216             assertEquals("Data node", nodeToWrite, optional.get());
217
218             cleanup(dataStore);
219         }};
220     }
221
222     @Test
223     public void testReadWriteTransactionWithMultipleShards() throws Exception{
224         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
225             DistributedDataStore dataStore =
226                     setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
227
228             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
229             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
230
231             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
232             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
233
234             doCommit(readWriteTx.ready());
235
236             readWriteTx = dataStore.newReadWriteTransaction();
237
238             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
239             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
240
241             doCommit(readWriteTx.ready());
242
243             readWriteTx = dataStore.newReadWriteTransaction();
244
245             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
246             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
247             readWriteTx.write(carPath, car);
248
249             MapEntryNode person = PeopleModel.newPersonEntry("jack");
250             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
251             readWriteTx.write(personPath, person);
252
253             Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
254             assertEquals("exists", true, exists);
255
256             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
257             assertEquals("isPresent", true, optional.isPresent());
258             assertEquals("Data node", car, optional.get());
259
260             doCommit(readWriteTx.ready());
261
262             // Verify the data in the store
263
264             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
265
266             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
267             assertEquals("isPresent", true, optional.isPresent());
268             assertEquals("Data node", car, optional.get());
269
270             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
271             assertEquals("isPresent", true, optional.isPresent());
272             assertEquals("Data node", person, optional.get());
273
274             cleanup(dataStore);
275         }};
276     }
277
278     @Test
279     public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
280         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
281             DistributedDataStore dataStore = setupDistributedDataStore(
282                     "testSingleTransactionsWritesInQuickSuccession", "cars-1");
283
284             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
285
286             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
287             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
288             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
289             doCommit(writeTx.ready());
290
291             writeTx = txChain.newWriteOnlyTransaction();
292
293             int nCars = 5;
294             for(int i = 0; i < nCars; i++) {
295                 writeTx.write(CarsModel.newCarPath("car" + i),
296                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
297             }
298
299             doCommit(writeTx.ready());
300
301             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
302                     CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
303             assertEquals("isPresent", true, optional.isPresent());
304             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
305
306             cleanup(dataStore);
307         }};
308     }
309
310     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
311             final boolean writeOnly) throws Exception {
312         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
313             String shardName = "test-1";
314
315             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
316             // initialized until we create and submit the write the Tx.
317             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
318             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
319             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
320
321             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
322
323             // Create the write Tx
324
325             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
326                     dataStore.newReadWriteTransaction();
327             assertNotNull("newReadWriteTransaction returned null", writeTx);
328
329             // Do some modification operations and ready the Tx on a separate thread.
330
331             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
332                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
333                             TestModel.ID_QNAME, 1).build();
334
335             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
336             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
337             final CountDownLatch txReady = new CountDownLatch(1);
338             Thread txThread = new Thread() {
339                 @Override
340                 public void run() {
341                     try {
342                         writeTx.write(TestModel.TEST_PATH,
343                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
344
345                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
346                                 TestModel.OUTER_LIST_QNAME).build());
347
348                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
349                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
350
351                         writeTx.delete(listEntryPath);
352
353                         txCohort.set(writeTx.ready());
354                     } catch(Exception e) {
355                         caughtEx.set(e);
356                         return;
357                     } finally {
358                         txReady.countDown();
359                     }
360                 }
361             };
362
363             txThread.start();
364
365             // Wait for the Tx operations to complete.
366
367             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
368             if(caughtEx.get() != null) {
369                 throw caughtEx.get();
370             }
371
372             assertEquals("Tx ready", true, done);
373
374             // At this point the Tx operations should be waiting for the shard to initialize so
375             // trigger the latch to let the shard recovery to continue.
376
377             blockRecoveryLatch.countDown();
378
379             // Wait for the Tx commit to complete.
380
381             doCommit(txCohort.get());
382
383             // Verify the data in the store
384
385             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
386
387             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
388                     get(5, TimeUnit.SECONDS);
389             assertEquals("isPresent", true, optional.isPresent());
390
391             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
392             assertEquals("isPresent", true, optional.isPresent());
393
394             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
395             assertEquals("isPresent", false, optional.isPresent());
396
397             cleanup(dataStore);
398         }};
399     }
400
401     @Test
402     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
403         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
404         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
405     }
406
407     @Test
408     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
409         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
410     }
411
412     @Test
413     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
414         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
415             String testName = "testTransactionReadsWithShardNotInitiallyReady";
416             String shardName = "test-1";
417
418             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
419             // initialized until we create the Tx.
420             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
421             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
422             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
423
424             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
425
426             // Create the read-write Tx
427
428             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
429             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
430
431             // Do some reads on the Tx on a separate thread.
432
433             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
434                     new AtomicReference<>();
435             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
436                     txReadFuture = new AtomicReference<>();
437             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
438             final CountDownLatch txReadsDone = new CountDownLatch(1);
439             Thread txThread = new Thread() {
440                 @Override
441                 public void run() {
442                     try {
443                         readWriteTx.write(TestModel.TEST_PATH,
444                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
445
446                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
447
448                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
449                     } catch(Exception e) {
450                         caughtEx.set(e);
451                         return;
452                     } finally {
453                         txReadsDone.countDown();
454                     }
455                 }
456             };
457
458             txThread.start();
459
460             // Wait for the Tx operations to complete.
461
462             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
463             if(caughtEx.get() != null) {
464                 throw caughtEx.get();
465             }
466
467             assertEquals("Tx reads done", true, done);
468
469             // At this point the Tx operations should be waiting for the shard to initialize so
470             // trigger the latch to let the shard recovery to continue.
471
472             blockRecoveryLatch.countDown();
473
474             // Wait for the reads to complete and verify.
475
476             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
477             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
478
479             readWriteTx.close();
480
481             cleanup(dataStore);
482         }};
483     }
484
485     @Test(expected=NotInitializedException.class)
486     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
487         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
488             String testName = "testTransactionCommitFailureWithShardNotInitialized";
489             String shardName = "test-1";
490
491             // Set the shard initialization timeout low for the test.
492
493             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
494
495             // Setup the InMemoryJournal to block shard recovery indefinitely.
496
497             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
498             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
499             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
500
501             InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
502
503             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
504
505             // Create the write Tx
506
507             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
508             assertNotNull("newReadWriteTransaction returned null", writeTx);
509
510             // Do some modifications and ready the Tx on a separate thread.
511
512             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
513             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
514             final CountDownLatch txReady = new CountDownLatch(1);
515             Thread txThread = new Thread() {
516                 @Override
517                 public void run() {
518                     try {
519                         writeTx.write(TestModel.TEST_PATH,
520                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
521
522                         txCohort.set(writeTx.ready());
523                     } catch(Exception e) {
524                         caughtEx.set(e);
525                         return;
526                     } finally {
527                         txReady.countDown();
528                     }
529                 }
530             };
531
532             txThread.start();
533
534             // Wait for the Tx operations to complete.
535
536             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
537             if(caughtEx.get() != null) {
538                 throw caughtEx.get();
539             }
540
541             assertEquals("Tx ready", true, done);
542
543             // Wait for the commit to complete. Since the shard never initialized, the Tx should
544             // have timed out and throw an appropriate exception cause.
545
546             try {
547                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
548             } catch(ExecutionException e) {
549                 throw e.getCause();
550             } finally {
551                 blockRecoveryLatch.countDown();
552                 cleanup(dataStore);
553             }
554         }};
555     }
556
557     @Test(expected=NotInitializedException.class)
558     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
559         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
560             String testName = "testTransactionReadFailureWithShardNotInitialized";
561             String shardName = "test-1";
562
563             // Set the shard initialization timeout low for the test.
564
565             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
566
567             // Setup the InMemoryJournal to block shard recovery indefinitely.
568
569             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
570             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
571             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
572
573             InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
574
575             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
576
577             // Create the read-write Tx
578
579             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
580             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
581
582             // Do a read on the Tx on a separate thread.
583
584             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
585                     txReadFuture = new AtomicReference<>();
586             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
587             final CountDownLatch txReadDone = new CountDownLatch(1);
588             Thread txThread = new Thread() {
589                 @Override
590                 public void run() {
591                     try {
592                         readWriteTx.write(TestModel.TEST_PATH,
593                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
594
595                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
596
597                         readWriteTx.close();
598                     } catch(Exception e) {
599                         caughtEx.set(e);
600                         return;
601                     } finally {
602                         txReadDone.countDown();
603                     }
604                 }
605             };
606
607             txThread.start();
608
609             // Wait for the Tx operations to complete.
610
611             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
612             if(caughtEx.get() != null) {
613                 throw caughtEx.get();
614             }
615
616             assertEquals("Tx read done", true, done);
617
618             // Wait for the read to complete. Since the shard never initialized, the Tx should
619             // have timed out and throw an appropriate exception cause.
620
621             try {
622                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
623             } catch(ReadFailedException e) {
624                 throw e.getCause();
625             } finally {
626                 blockRecoveryLatch.countDown();
627                 cleanup(dataStore);
628             }
629         }};
630     }
631
632     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) throws Throwable {
633         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
634             String shardName = "default";
635
636             // We don't want the shard to become the leader so prevent shard elections.
637             datastoreContextBuilder.customRaftPolicyImplementation(
638                     "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
639
640             // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
641             datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1).
642                     shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
643
644             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
645
646             Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
647                     new FindLocalShard(shardName, true));
648             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
649
650             // Create the write Tx.
651
652             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
653                 dataStore.newReadWriteTransaction();
654             assertNotNull("newReadWriteTransaction returned null", writeTx);
655
656             // Do some modifications and ready the Tx on a separate thread.
657
658             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
659             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
660             final CountDownLatch txReady = new CountDownLatch(1);
661             Thread txThread = new Thread() {
662                 @Override
663                 public void run() {
664                     try {
665                         writeTx.write(TestModel.JUNK_PATH,
666                                 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
667
668                         txCohort.set(writeTx.ready());
669                     } catch(Exception e) {
670                         caughtEx.set(e);
671                         return;
672                     } finally {
673                         txReady.countDown();
674                     }
675                 }
676             };
677
678             txThread.start();
679
680             // Wait for the Tx operations to complete.
681
682             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
683             if(caughtEx.get() != null) {
684                 throw caughtEx.get();
685             }
686
687             assertEquals("Tx ready", true, done);
688
689             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
690             // should have timed out and throw an appropriate exception cause.
691
692             try {
693                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
694             } catch(ExecutionException e) {
695                 throw e.getCause();
696             } finally {
697                 cleanup(dataStore);
698             }
699         }};
700     }
701
702     @Test(expected=NoShardLeaderException.class)
703     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
704         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
705         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
706     }
707
708     @Test(expected=NoShardLeaderException.class)
709     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
710         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
711     }
712
713     @Test
714     public void testTransactionAbort() throws Exception{
715         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
716             DistributedDataStore dataStore =
717                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
718
719             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
720             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
721
722             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
723
724             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
725
726             cohort.canCommit().get(5, TimeUnit.SECONDS);
727
728             cohort.abort().get(5, TimeUnit.SECONDS);
729
730             testWriteTransaction(dataStore, TestModel.TEST_PATH,
731                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
732
733             cleanup(dataStore);
734         }};
735     }
736
737     @Test
738     public void testTransactionChainWithSingleShard() throws Exception{
739         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
740             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
741
742             // 1. Create a Tx chain and write-only Tx
743
744             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
745
746             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
747             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
748
749             // 2. Write some data
750
751             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
752             writeTx.write(TestModel.TEST_PATH, testNode);
753
754             // 3. Ready the Tx for commit
755
756             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
757
758             // 4. Commit the Tx on another thread that first waits for the second read Tx.
759
760             final CountDownLatch continueCommit1 = new CountDownLatch(1);
761             final CountDownLatch commit1Done = new CountDownLatch(1);
762             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
763             new Thread() {
764                 @Override
765                 public void run() {
766                     try {
767                         continueCommit1.await();
768                         doCommit(cohort1);
769                     } catch (Exception e) {
770                         commit1Error.set(e);
771                     } finally {
772                         commit1Done.countDown();
773                     }
774                 }
775             }.start();
776
777             // 5. Create a new read Tx from the chain to read and verify the data from the first
778             // Tx is visible after being readied.
779
780             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
781             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
782             assertEquals("isPresent", true, optional.isPresent());
783             assertEquals("Data node", testNode, optional.get());
784
785             // 6. Create a new RW Tx from the chain, write more data, and ready it
786
787             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
788             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
789             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
790
791             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
792
793             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
794             // verify it is visible.
795
796             readTx = txChain.newReadWriteTransaction();
797             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
798             assertEquals("isPresent", true, optional.isPresent());
799             assertEquals("Data node", outerNode, optional.get());
800
801             // 8. Wait for the 2 commits to complete and close the chain.
802
803             continueCommit1.countDown();
804             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
805
806             if(commit1Error.get() != null) {
807                 throw commit1Error.get();
808             }
809
810             doCommit(cohort2);
811
812             txChain.close();
813
814             // 9. Create a new read Tx from the data store and verify committed data.
815
816             readTx = dataStore.newReadOnlyTransaction();
817             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
818             assertEquals("isPresent", true, optional.isPresent());
819             assertEquals("Data node", outerNode, optional.get());
820
821             cleanup(dataStore);
822         }};
823     }
824
825     @Test
826     public void testTransactionChainWithMultipleShards() throws Exception{
827         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
828             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
829                     "cars-1", "people-1");
830
831             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
832
833             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
834             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
835
836             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
837             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
838
839             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
840             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
841
842             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
843
844             DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
845
846             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
847             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
848             readWriteTx.write(carPath, car);
849
850             MapEntryNode person = PeopleModel.newPersonEntry("jack");
851             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
852             readWriteTx.merge(personPath, person);
853
854             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
855             assertEquals("isPresent", true, optional.isPresent());
856             assertEquals("Data node", car, optional.get());
857
858             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
859             assertEquals("isPresent", true, optional.isPresent());
860             assertEquals("Data node", person, optional.get());
861
862             DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
863
864             writeTx = txChain.newWriteOnlyTransaction();
865
866             writeTx.delete(carPath);
867
868             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
869
870             ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
871             ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
872
873             doCommit(canCommit1, cohort1);
874             doCommit(canCommit2, cohort2);
875             doCommit(cohort3);
876
877             txChain.close();
878
879             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
880
881             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
882             assertEquals("isPresent", false, optional.isPresent());
883
884             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
885             assertEquals("isPresent", true, optional.isPresent());
886             assertEquals("Data node", person, optional.get());
887
888             cleanup(dataStore);
889         }};
890     }
891
892     @Test
893     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
894         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
895             DistributedDataStore dataStore = setupDistributedDataStore(
896                     "testCreateChainedTransactionsInQuickSuccession", "cars-1");
897
898             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
899                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
900                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
901
902             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
903             DOMTransactionChain txChain = broker.createTransactionChain(listener);
904
905             List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
906
907             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
908             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
909             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
910             futures.add(writeTx.submit());
911
912             int nCars = 100;
913             for(int i = 0; i < nCars; i++) {
914                 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
915
916                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
917                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
918
919                 futures.add(rwTx.submit());
920             }
921
922             for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
923                 f.checkedGet();
924             }
925
926             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
927                     LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
928             assertEquals("isPresent", true, optional.isPresent());
929             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
930
931             txChain.close();
932
933             broker.close();
934
935             cleanup(dataStore);
936         }};
937     }
938
939     @Test
940     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
941         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
942             DistributedDataStore dataStore = setupDistributedDataStore(
943                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
944
945             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
946
947             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
948
949             rwTx1.ready();
950
951             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
952
953             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
954             assertEquals("isPresent", false, optional.isPresent());
955
956             txChain.close();
957
958             cleanup(dataStore);
959         }};
960     }
961
962     @Test
963     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
964         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
965             DistributedDataStore dataStore = setupDistributedDataStore(
966                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
967
968             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
969
970             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
971             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
972
973             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
974
975             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
976             // readied.
977
978             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
979         }};
980     }
981
982     @Test
983     public void testCreateChainedTransactionAfterClose() throws Throwable {
984         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
985             DistributedDataStore dataStore = setupDistributedDataStore(
986                     "testCreateChainedTransactionAfterClose", "test-1");
987
988             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
989
990             txChain.close();
991
992             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
993
994             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
995         }};
996     }
997
998     @Test
999     public void testChainWithReadOnlyTxAfterPreviousReady() throws Throwable {
1000         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1001             DistributedDataStore dataStore = setupDistributedDataStore(
1002                     "testChainWithReadOnlyTxAfterPreviousReady", "test-1");
1003
1004             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1005
1006             // Create a write tx and submit.
1007
1008             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1009             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1010             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1011
1012             // Create read-only tx's and issue a read.
1013
1014             CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 =
1015                     txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH);
1016
1017             CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 =
1018                     txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH);
1019
1020             // Create another write tx and issue the write.
1021
1022             DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1023             writeTx2.write(TestModel.OUTER_LIST_PATH,
1024                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1025
1026             // Ensure the reads succeed.
1027
1028             assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1029             assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1030
1031             // Ensure the writes succeed.
1032
1033             DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1034
1035             doCommit(cohort1);
1036             doCommit(cohort2);
1037
1038             assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH).
1039                     checkedGet(5, TimeUnit.SECONDS).isPresent());
1040         }};
1041     }
1042
1043     @Test
1044     public void testChainedTransactionFailureWithSingleShard() throws Exception{
1045         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1046             DistributedDataStore dataStore = setupDistributedDataStore(
1047                     "testChainedTransactionFailureWithSingleShard", "cars-1");
1048
1049             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1050                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1051                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1052
1053             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1054             DOMTransactionChain txChain = broker.createTransactionChain(listener);
1055
1056             DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1057
1058             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1059                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1060                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1061
1062             rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1063
1064             try {
1065                 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1066                 fail("Expected TransactionCommitFailedException");
1067             } catch (TransactionCommitFailedException e) {
1068                 // Expected
1069             }
1070
1071             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1072
1073             txChain.close();
1074             broker.close();
1075             cleanup(dataStore);
1076         }};
1077     }
1078
1079     @Test
1080     public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1081         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1082             DistributedDataStore dataStore = setupDistributedDataStore(
1083                     "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1084
1085             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1086                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1087                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1088
1089             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1090             DOMTransactionChain txChain = broker.createTransactionChain(listener);
1091
1092             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1093
1094             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1095
1096             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1097                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1098                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1099
1100             // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1101             // done for put for performance reasons.
1102             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1103
1104             try {
1105                 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1106                 fail("Expected TransactionCommitFailedException");
1107             } catch (TransactionCommitFailedException e) {
1108                 // Expected
1109             }
1110
1111             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1112
1113             txChain.close();
1114             broker.close();
1115             cleanup(dataStore);
1116         }};
1117     }
1118
1119     @Test
1120     public void testChangeListenerRegistration() throws Exception{
1121         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1122             DistributedDataStore dataStore =
1123                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1124
1125             testWriteTransaction(dataStore, TestModel.TEST_PATH,
1126                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1127
1128             MockDataChangeListener listener = new MockDataChangeListener(1);
1129
1130             ListenerRegistration<MockDataChangeListener>
1131                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1132                             DataChangeScope.SUBTREE);
1133
1134             assertNotNull("registerChangeListener returned null", listenerReg);
1135
1136             // Wait for the initial notification
1137
1138             listener.waitForChangeEvents(TestModel.TEST_PATH);
1139
1140             listener.reset(2);
1141
1142             // Write 2 updates.
1143
1144             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1145                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1146
1147             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1148                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1149             testWriteTransaction(dataStore, listPath,
1150                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1151
1152             // Wait for the 2 updates.
1153
1154             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1155
1156             listenerReg.close();
1157
1158             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1159                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1160                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1161
1162             listener.expectNoMoreChanges("Received unexpected change after close");
1163
1164             cleanup(dataStore);
1165         }};
1166     }
1167
1168     @Test
1169     public void testRestoreFromDatastoreSnapshot() throws Exception{
1170         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1171             String name = "transactionIntegrationTest";
1172
1173             ContainerNode carsNode = CarsModel.newCarsNode(CarsModel.newCarsMapNode(
1174                     CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1175                     CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1176
1177             ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
1178             AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1179             NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
1180                     YangInstanceIdentifier.builder().build());
1181
1182             Snapshot carsSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
1183                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1184
1185             NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1186             dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
1187             AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1188             root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.builder().build());
1189
1190             Snapshot peopleSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
1191                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1192
1193             restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1194                     new DatastoreSnapshot.ShardSnapshot("cars",
1195                             org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
1196                     new DatastoreSnapshot.ShardSnapshot("people",
1197                             org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
1198
1199             DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1200                     true, "cars", "people");
1201
1202             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1203
1204             Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1205             assertEquals("isPresent", true, optional.isPresent());
1206             assertEquals("Data node", carsNode, optional.get());
1207
1208             optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1209             assertEquals("isPresent", true, optional.isPresent());
1210             assertEquals("Data node", peopleNode, optional.get());
1211
1212             cleanup(dataStore);
1213         }};
1214     }
1215 }