47cc35937f89a7a172ccfef1eb0c70d6b94ba1b5
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.io.IOException;
32 import java.math.BigInteger;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
52 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
53 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.Snapshot;
56 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
57 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
61 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
62 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
63 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
64 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
65 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
66 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
67 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
68 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
69 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
72 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
74 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
76 import org.opendaylight.yangtools.concepts.ListenerRegistration;
77 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
78 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
80 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
81 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
82 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
83 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
84 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
85 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
86 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
87
88 public class DistributedDataStoreIntegrationTest {
89
90     private static ActorSystem system;
91
92     private final DatastoreContext.Builder datastoreContextBuilder =
93             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
94
95     @BeforeClass
96     public static void setUpClass() throws IOException {
97         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
98         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
99         Cluster.get(system).join(member1Address);
100     }
101
102     @AfterClass
103     public static void tearDownClass() throws IOException {
104         JavaTestKit.shutdownActorSystem(system);
105         system = null;
106     }
107
108     protected ActorSystem getSystem() {
109         return system;
110     }
111
112     @Test
113     public void testWriteTransactionWithSingleShard() throws Exception{
114         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
115             try (DistributedDataStore dataStore =
116                     setupDistributedDataStore("transactionIntegrationTest", "test-1")) {
117
118                 testWriteTransaction(dataStore, TestModel.TEST_PATH,
119                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
120
121                 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
122                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
123             }
124         }};
125     }
126
127     @Test
128     public void testWriteTransactionWithMultipleShards() throws Exception{
129         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
130             try (DistributedDataStore dataStore =
131                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
132
133                 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
134                 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
135
136                 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
137                 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
138
139                 doCommit(writeTx.ready());
140
141                 writeTx = dataStore.newWriteOnlyTransaction();
142
143                 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
144                 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
145
146                 doCommit(writeTx.ready());
147
148                 writeTx = dataStore.newWriteOnlyTransaction();
149
150                 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
151                 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
152                 writeTx.write(carPath, car);
153
154                 MapEntryNode person = PeopleModel.newPersonEntry("jack");
155                 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
156                 writeTx.write(personPath, person);
157
158                 doCommit(writeTx.ready());
159
160                 // Verify the data in the store
161
162                 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
163
164                 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
165                 assertEquals("isPresent", true, optional.isPresent());
166                 assertEquals("Data node", car, optional.get());
167
168                 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
169                 assertEquals("isPresent", true, optional.isPresent());
170                 assertEquals("Data node", person, optional.get());
171             }
172         }};
173     }
174
175     @Test
176     public void testReadWriteTransactionWithSingleShard() throws Exception{
177         System.setProperty("shard.persistent", "true");
178         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
179             try (DistributedDataStore dataStore =
180                     setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1")) {
181
182                 // 1. Create a read-write Tx
183
184                 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
185                 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
186
187                 // 2. Write some data
188
189                 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
190                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
191                 readWriteTx.write(nodePath, nodeToWrite );
192
193                 // 3. Read the data from Tx
194
195                 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
196                 assertEquals("exists", true, exists);
197
198                 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
199                 assertEquals("isPresent", true, optional.isPresent());
200                 assertEquals("Data node", nodeToWrite, optional.get());
201
202                 // 4. Ready the Tx for commit
203
204                 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
205
206                 // 5. Commit the Tx
207
208                 doCommit(cohort);
209
210                 // 6. Verify the data in the store
211
212                 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
213
214                 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
215                 assertEquals("isPresent", true, optional.isPresent());
216                 assertEquals("Data node", nodeToWrite, optional.get());
217             }
218         }};
219     }
220
221     @Test
222     public void testReadWriteTransactionWithMultipleShards() throws Exception{
223         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
224             try (DistributedDataStore dataStore =
225                     setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
226
227                 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
228                 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
229
230                 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
231                 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
232
233                 doCommit(readWriteTx.ready());
234
235                 readWriteTx = dataStore.newReadWriteTransaction();
236
237                 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
238                 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
239
240                 doCommit(readWriteTx.ready());
241
242                 readWriteTx = dataStore.newReadWriteTransaction();
243
244                 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
245                 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
246                 readWriteTx.write(carPath, car);
247
248                 MapEntryNode person = PeopleModel.newPersonEntry("jack");
249                 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
250                 readWriteTx.write(personPath, person);
251
252                 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
253                 assertEquals("exists", true, exists);
254
255                 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
256                 assertEquals("isPresent", true, optional.isPresent());
257                 assertEquals("Data node", car, optional.get());
258
259                 doCommit(readWriteTx.ready());
260
261                 // Verify the data in the store
262
263                 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
264
265                 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
266                 assertEquals("isPresent", true, optional.isPresent());
267                 assertEquals("Data node", car, optional.get());
268
269                 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
270                 assertEquals("isPresent", true, optional.isPresent());
271                 assertEquals("Data node", person, optional.get());
272
273             }
274         }};
275     }
276
277     @Test
278     public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
279         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
280             try (DistributedDataStore dataStore = setupDistributedDataStore(
281                     "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
282
283                 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
284
285                 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
286                 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
287                 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
288                 doCommit(writeTx.ready());
289
290                 writeTx = txChain.newWriteOnlyTransaction();
291
292                 int nCars = 5;
293                 for(int i = 0; i < nCars; i++) {
294                     writeTx.write(CarsModel.newCarPath("car" + i),
295                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
296                 }
297
298                 doCommit(writeTx.ready());
299
300                 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
301                     CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
302                 assertEquals("isPresent", true, optional.isPresent());
303                 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
304             }
305         }};
306     }
307
308     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
309             final boolean writeOnly) throws Exception {
310         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
311             String shardName = "test-1";
312
313             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
314             // initialized until we create and submit the write the Tx.
315             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
316             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
317             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
318
319             try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
320
321                 // Create the write Tx
322
323                 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
324                     dataStore.newReadWriteTransaction();
325                 assertNotNull("newReadWriteTransaction returned null", writeTx);
326
327                 // Do some modification operations and ready the Tx on a separate thread.
328
329                 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
330                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
331                         TestModel.ID_QNAME, 1).build();
332
333                 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
334                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
335                 final CountDownLatch txReady = new CountDownLatch(1);
336                 Thread txThread = new Thread() {
337                     @Override
338                     public void run() {
339                         try {
340                             writeTx.write(TestModel.TEST_PATH,
341                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
342
343                             writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
344                                 TestModel.OUTER_LIST_QNAME).build());
345
346                             writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
347                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
348
349                             writeTx.delete(listEntryPath);
350
351                             txCohort.set(writeTx.ready());
352                         } catch(Exception e) {
353                             caughtEx.set(e);
354                             return;
355                         } finally {
356                             txReady.countDown();
357                         }
358                     }
359                 };
360
361                 txThread.start();
362
363                 // Wait for the Tx operations to complete.
364
365                 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
366                 if(caughtEx.get() != null) {
367                     throw caughtEx.get();
368                 }
369
370                 assertEquals("Tx ready", true, done);
371
372                 // At this point the Tx operations should be waiting for the shard to initialize so
373                 // trigger the latch to let the shard recovery to continue.
374
375                 blockRecoveryLatch.countDown();
376
377                 // Wait for the Tx commit to complete.
378
379                 doCommit(txCohort.get());
380
381                 // Verify the data in the store
382
383                 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
384
385                 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
386                         get(5, TimeUnit.SECONDS);
387                 assertEquals("isPresent", true, optional.isPresent());
388
389                 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
390                 assertEquals("isPresent", true, optional.isPresent());
391
392                 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
393                 assertEquals("isPresent", false, optional.isPresent());
394             }
395         }};
396     }
397
398     @Test
399     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
400         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
401         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
402     }
403
404     @Test
405     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
406         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
407     }
408
409     @Test
410     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
411         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
412             String testName = "testTransactionReadsWithShardNotInitiallyReady";
413             String shardName = "test-1";
414
415             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
416             // initialized until we create the Tx.
417             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
418             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
419             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
420
421             try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
422
423                 // Create the read-write Tx
424
425                 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
426                 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
427
428                 // Do some reads on the Tx on a separate thread.
429
430                 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
431                         new AtomicReference<>();
432                 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
433                 txReadFuture = new AtomicReference<>();
434                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
435                 final CountDownLatch txReadsDone = new CountDownLatch(1);
436                 Thread txThread = new Thread() {
437                     @Override
438                     public void run() {
439                         try {
440                             readWriteTx.write(TestModel.TEST_PATH,
441                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
442
443                             txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
444
445                             txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
446                         } catch(Exception e) {
447                             caughtEx.set(e);
448                             return;
449                         } finally {
450                             txReadsDone.countDown();
451                         }
452                     }
453                 };
454
455                 txThread.start();
456
457                 // Wait for the Tx operations to complete.
458
459                 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
460                 if(caughtEx.get() != null) {
461                     throw caughtEx.get();
462                 }
463
464                 assertEquals("Tx reads done", true, done);
465
466                 // At this point the Tx operations should be waiting for the shard to initialize so
467                 // trigger the latch to let the shard recovery to continue.
468
469                 blockRecoveryLatch.countDown();
470
471                 // Wait for the reads to complete and verify.
472
473                 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
474                 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
475
476                 readWriteTx.close();
477             }
478         }};
479     }
480
481     @Test(expected=NotInitializedException.class)
482     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
483         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
484             String testName = "testTransactionCommitFailureWithShardNotInitialized";
485             String shardName = "test-1";
486
487             // Set the shard initialization timeout low for the test.
488
489             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
490
491             // Setup the InMemoryJournal to block shard recovery indefinitely.
492
493             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
494             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
495             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
496
497             InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
498
499             try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
500
501                 // Create the write Tx
502
503                 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
504                 assertNotNull("newReadWriteTransaction returned null", writeTx);
505
506                 // Do some modifications and ready the Tx on a separate thread.
507
508                 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
509                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
510                 final CountDownLatch txReady = new CountDownLatch(1);
511                 Thread txThread = new Thread() {
512                     @Override
513                     public void run() {
514                         try {
515                             writeTx.write(TestModel.TEST_PATH,
516                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
517
518                             txCohort.set(writeTx.ready());
519                         } catch(Exception e) {
520                             caughtEx.set(e);
521                             return;
522                         } finally {
523                             txReady.countDown();
524                         }
525                     }
526                 };
527
528                 txThread.start();
529
530                 // Wait for the Tx operations to complete.
531
532                 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
533                 if(caughtEx.get() != null) {
534                     throw caughtEx.get();
535                 }
536
537                 assertEquals("Tx ready", true, done);
538
539                 // Wait for the commit to complete. Since the shard never initialized, the Tx should
540                 // have timed out and throw an appropriate exception cause.
541
542                 try {
543                     txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
544                 } catch(ExecutionException e) {
545                     throw e.getCause();
546                 } finally {
547                     blockRecoveryLatch.countDown();
548                 }
549             }
550         }};
551     }
552
553     @Test(expected=NotInitializedException.class)
554     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
555         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
556             String testName = "testTransactionReadFailureWithShardNotInitialized";
557             String shardName = "test-1";
558
559             // Set the shard initialization timeout low for the test.
560
561             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
562
563             // Setup the InMemoryJournal to block shard recovery indefinitely.
564
565             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
566             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
567             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
568
569             InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
570
571             try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
572
573                 // Create the read-write Tx
574
575                 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
576                 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
577
578                 // Do a read on the Tx on a separate thread.
579
580                 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
581                 txReadFuture = new AtomicReference<>();
582                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
583                 final CountDownLatch txReadDone = new CountDownLatch(1);
584                 Thread txThread = new Thread() {
585                     @Override
586                     public void run() {
587                         try {
588                             readWriteTx.write(TestModel.TEST_PATH,
589                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
590
591                             txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
592
593                             readWriteTx.close();
594                         } catch(Exception e) {
595                             caughtEx.set(e);
596                             return;
597                         } finally {
598                             txReadDone.countDown();
599                         }
600                     }
601                 };
602
603                 txThread.start();
604
605                 // Wait for the Tx operations to complete.
606
607                 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
608                 if(caughtEx.get() != null) {
609                     throw caughtEx.get();
610                 }
611
612                 assertEquals("Tx read done", true, done);
613
614                 // Wait for the read to complete. Since the shard never initialized, the Tx should
615                 // have timed out and throw an appropriate exception cause.
616
617                 try {
618                     txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
619                 } catch(ReadFailedException e) {
620                     throw e.getCause();
621                 } finally {
622                     blockRecoveryLatch.countDown();
623                 }
624             }
625         }};
626     }
627
628     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) throws Throwable {
629         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
630             String shardName = "default";
631
632             // We don't want the shard to become the leader so prevent shard elections.
633             datastoreContextBuilder.customRaftPolicyImplementation(
634                     "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
635
636             // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
637             datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1).
638                     shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
639
640             try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
641
642                 Object result = dataStore.getActorContext().executeOperation(
643                     dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
644                 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
645
646                 // Create the write Tx.
647
648                 try (final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
649                     dataStore.newReadWriteTransaction()) {
650                     assertNotNull("newReadWriteTransaction returned null", writeTx);
651
652                     // Do some modifications and ready the Tx on a separate thread.
653
654                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
655                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
656                     final CountDownLatch txReady = new CountDownLatch(1);
657                     Thread txThread = new Thread() {
658                         @Override
659                         public void run() {
660                             try {
661                                 writeTx.write(TestModel.JUNK_PATH,
662                                     ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
663
664                                 txCohort.set(writeTx.ready());
665                             } catch(Exception e) {
666                                 caughtEx.set(e);
667                                 return;
668                             } finally {
669                                 txReady.countDown();
670                             }
671                         }
672                     };
673
674                     txThread.start();
675
676                     // Wait for the Tx operations to complete.
677
678                     boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
679                     if(caughtEx.get() != null) {
680                         throw caughtEx.get();
681                     }
682
683                     assertEquals("Tx ready", true, done);
684
685                     // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
686                     // should have timed out and throw an appropriate exception cause.
687
688                     try {
689                         txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
690                     } catch(ExecutionException e) {
691                         throw e.getCause();
692                     }
693                 }
694             }
695         }};
696     }
697
698     @Test(expected=NoShardLeaderException.class)
699     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
700         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
701         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
702     }
703
704     @Test(expected=NoShardLeaderException.class)
705     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
706         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
707     }
708
709     @Test
710     public void testTransactionAbort() throws Exception{
711         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
712             try (DistributedDataStore dataStore =
713                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1")) {
714
715                 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
716                 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
717
718                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
719
720                 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
721
722                 cohort.canCommit().get(5, TimeUnit.SECONDS);
723
724                 cohort.abort().get(5, TimeUnit.SECONDS);
725
726                 testWriteTransaction(dataStore, TestModel.TEST_PATH,
727                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
728             }
729         }};
730     }
731
732     @Test
733     public void testTransactionChainWithSingleShard() throws Exception{
734         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
735             try (DistributedDataStore dataStore =
736                     setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1")) {
737
738                 // 1. Create a Tx chain and write-only Tx
739
740                 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
741
742                 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
743                 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
744
745                 // 2. Write some data
746
747                 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
748                 writeTx.write(TestModel.TEST_PATH, testNode);
749
750                 // 3. Ready the Tx for commit
751
752                 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
753
754                 // 4. Commit the Tx on another thread that first waits for the second read Tx.
755
756                 final CountDownLatch continueCommit1 = new CountDownLatch(1);
757                 final CountDownLatch commit1Done = new CountDownLatch(1);
758                 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
759                 new Thread() {
760                     @Override
761                     public void run() {
762                         try {
763                             continueCommit1.await();
764                             doCommit(cohort1);
765                         } catch (Exception e) {
766                             commit1Error.set(e);
767                         } finally {
768                             commit1Done.countDown();
769                         }
770                     }
771                 }.start();
772
773                 // 5. Create a new read Tx from the chain to read and verify the data from the first
774                 // Tx is visible after being readied.
775
776                 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
777                 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
778                 assertEquals("isPresent", true, optional.isPresent());
779                 assertEquals("Data node", testNode, optional.get());
780
781                 // 6. Create a new RW Tx from the chain, write more data, and ready it
782
783                 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
784                 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
785                 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
786
787                 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
788
789                 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
790                 // verify it is visible.
791
792                 readTx = txChain.newReadWriteTransaction();
793                 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
794                 assertEquals("isPresent", true, optional.isPresent());
795                 assertEquals("Data node", outerNode, optional.get());
796
797                 // 8. Wait for the 2 commits to complete and close the chain.
798
799                 continueCommit1.countDown();
800                 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
801
802                 if(commit1Error.get() != null) {
803                     throw commit1Error.get();
804                 }
805
806                 doCommit(cohort2);
807
808                 txChain.close();
809
810                 // 9. Create a new read Tx from the data store and verify committed data.
811
812                 readTx = dataStore.newReadOnlyTransaction();
813                 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
814                 assertEquals("isPresent", true, optional.isPresent());
815                 assertEquals("Data node", outerNode, optional.get());
816             }
817         }};
818     }
819
820     @Test
821     public void testTransactionChainWithMultipleShards() throws Exception{
822         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
823             try (DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
824                     "cars-1", "people-1")) {
825
826                 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
827
828                 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
829                 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
830
831                 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
832                 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
833
834                 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
835                 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
836
837                 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
838
839                 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
840
841                 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
842                 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
843                 readWriteTx.write(carPath, car);
844
845                 MapEntryNode person = PeopleModel.newPersonEntry("jack");
846                 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
847                 readWriteTx.merge(personPath, person);
848
849                 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
850                 assertEquals("isPresent", true, optional.isPresent());
851                 assertEquals("Data node", car, optional.get());
852
853                 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
854                 assertEquals("isPresent", true, optional.isPresent());
855                 assertEquals("Data node", person, optional.get());
856
857                 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
858
859                 writeTx = txChain.newWriteOnlyTransaction();
860
861                 writeTx.delete(carPath);
862
863                 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
864
865                 ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
866                 ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
867
868                 doCommit(canCommit1, cohort1);
869                 doCommit(canCommit2, cohort2);
870                 doCommit(cohort3);
871
872                 txChain.close();
873
874                 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
875
876                 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
877                 assertEquals("isPresent", false, optional.isPresent());
878
879                 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
880                 assertEquals("isPresent", true, optional.isPresent());
881                 assertEquals("Data node", person, optional.get());
882             }
883         }};
884     }
885
886     @Test
887     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
888         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
889             try (DistributedDataStore dataStore = setupDistributedDataStore(
890                     "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
891
892                 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
893                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
894                         LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
895
896                 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
897                 DOMTransactionChain txChain = broker.createTransactionChain(listener);
898
899                 List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
900
901                 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
902                 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
903                 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
904                 futures.add(writeTx.submit());
905
906                 int nCars = 100;
907                 for(int i = 0; i < nCars; i++) {
908                     DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
909
910                     rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
911                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
912
913                     futures.add(rwTx.submit());
914                 }
915
916                 for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
917                     f.checkedGet();
918                 }
919
920                 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
921                     LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
922                 assertEquals("isPresent", true, optional.isPresent());
923                 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
924
925                 txChain.close();
926
927                 broker.close();
928             }
929         }};
930     }
931
932     @Test
933     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
934         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
935             try (DistributedDataStore dataStore = setupDistributedDataStore(
936                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
937
938                 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
939
940                 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
941
942                 rwTx1.ready();
943
944                 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
945
946                 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
947                 assertEquals("isPresent", false, optional.isPresent());
948
949                 txChain.close();
950             }
951         }};
952     }
953
954     @Test
955     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
956         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
957             try (DistributedDataStore dataStore = setupDistributedDataStore(
958                 "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
959
960                 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
961
962                 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
963                 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
964
965                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
966
967                 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
968                 // readied.
969
970                 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
971             }
972         }};
973     }
974
975     @Test
976     public void testCreateChainedTransactionAfterClose() throws Throwable {
977         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
978             try (DistributedDataStore dataStore = setupDistributedDataStore(
979                 "testCreateChainedTransactionAfterClose", "test-1")) {
980
981                 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
982
983                 txChain.close();
984
985                 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
986
987                 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
988             }
989         }};
990     }
991
992     @Test
993     public void testChainWithReadOnlyTxAfterPreviousReady() throws Throwable {
994         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
995             try (DistributedDataStore dataStore = setupDistributedDataStore(
996                 "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
997
998                 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
999
1000                 // Create a write tx and submit.
1001
1002                 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1003                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1004                 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1005
1006                 // Create read-only tx's and issue a read.
1007
1008                 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 =
1009                         txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH);
1010
1011                 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 =
1012                         txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH);
1013
1014                 // Create another write tx and issue the write.
1015
1016                 DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1017                 writeTx2.write(TestModel.OUTER_LIST_PATH,
1018                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1019
1020                 // Ensure the reads succeed.
1021
1022                 assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1023                 assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1024
1025                 // Ensure the writes succeed.
1026
1027                 DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1028
1029                 doCommit(cohort1);
1030                 doCommit(cohort2);
1031
1032                 assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH).
1033                     checkedGet(5, TimeUnit.SECONDS).isPresent());
1034             }
1035         }};
1036     }
1037
1038     @Test
1039     public void testChainedTransactionFailureWithSingleShard() throws Exception{
1040         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1041             try (DistributedDataStore dataStore = setupDistributedDataStore(
1042                     "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1043
1044                 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1045                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1046                         LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1047
1048                 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1049                 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1050
1051                 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1052
1053                 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1054                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1055                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1056
1057                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1058
1059                 try {
1060                     rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1061                     fail("Expected TransactionCommitFailedException");
1062                 } catch (TransactionCommitFailedException e) {
1063                     // Expected
1064                 }
1065
1066                 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1067
1068                 txChain.close();
1069                 broker.close();
1070             }
1071         }};
1072     }
1073
1074     @Test
1075     public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1076         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1077             try (DistributedDataStore dataStore = setupDistributedDataStore(
1078                     "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1079
1080                 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1081                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1082                         LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1083
1084                 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1085                 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1086
1087                 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1088
1089                 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1090
1091                 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1092                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1093                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1094
1095                 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1096                 // done for put for performance reasons.
1097                 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1098
1099                 try {
1100                     writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1101                     fail("Expected TransactionCommitFailedException");
1102                 } catch (TransactionCommitFailedException e) {
1103                     // Expected
1104                 }
1105
1106                 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1107
1108                 txChain.close();
1109                 broker.close();
1110             }
1111         }};
1112     }
1113
1114     @Test
1115     public void testChangeListenerRegistration() throws Exception{
1116         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1117             try (DistributedDataStore dataStore =
1118                     setupDistributedDataStore("testChangeListenerRegistration", "test-1")) {
1119
1120                 testWriteTransaction(dataStore, TestModel.TEST_PATH,
1121                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1122
1123                 MockDataChangeListener listener = new MockDataChangeListener(1);
1124
1125                 ListenerRegistration<MockDataChangeListener>
1126                 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1127                     DataChangeScope.SUBTREE);
1128
1129                 assertNotNull("registerChangeListener returned null", listenerReg);
1130
1131                 // Wait for the initial notification
1132
1133                 listener.waitForChangeEvents(TestModel.TEST_PATH);
1134
1135                 listener.reset(2);
1136
1137                 // Write 2 updates.
1138
1139                 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1140                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1141
1142                 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1143                         nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1144                 testWriteTransaction(dataStore, listPath,
1145                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1146
1147                 // Wait for the 2 updates.
1148
1149                 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1150
1151                 listenerReg.close();
1152
1153                 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1154                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1155                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1156
1157                 listener.expectNoMoreChanges("Received unexpected change after close");
1158             }
1159         }};
1160     }
1161
1162     @Test
1163     public void testRestoreFromDatastoreSnapshot() throws Exception {
1164         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1165             String name = "transactionIntegrationTest";
1166
1167             ContainerNode carsNode = CarsModel.newCarsNode(CarsModel.newCarsMapNode(
1168                     CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1169                     CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1170
1171             DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1172             dataTree.setSchemaContext(SchemaContextHelper.full());
1173             AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1174             NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1175
1176             Snapshot carsSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
1177                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1178
1179             NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1180             dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1181             dataTree.setSchemaContext(SchemaContextHelper.full());
1182             AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1183             root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1184
1185             Snapshot peopleSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
1186                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1187
1188             restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1189                     new DatastoreSnapshot.ShardSnapshot("cars",
1190                             org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
1191                     new DatastoreSnapshot.ShardSnapshot("people",
1192                             org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
1193
1194             try (DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1195                     true, "cars", "people")) {
1196
1197                 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1198
1199                 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1200                 assertEquals("isPresent", true, optional.isPresent());
1201                 assertEquals("Data node", carsNode, optional.get());
1202
1203                 optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1204                 assertEquals("isPresent", true, optional.isPresent());
1205                 assertEquals("Data node", peopleNode, optional.get());
1206             }
1207         }};
1208     }
1209 }