Bug 4564: Implement datastore restore from backup file
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.io.IOException;
32 import java.math.BigInteger;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
48 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
49 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
50 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
51 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
52 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
53 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
54 import org.opendaylight.controller.cluster.raft.Snapshot;
55 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
56 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
57 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
59 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
60 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
61 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
62 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
63 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
64 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
65 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
66 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
67 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
68 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
72 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
74 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
75 import org.opendaylight.yangtools.concepts.ListenerRegistration;
76 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
77 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
80 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
81 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
82 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
83
84 public class DistributedDataStoreIntegrationTest {
85
86     private static ActorSystem system;
87
88     private final DatastoreContext.Builder datastoreContextBuilder =
89             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
90
91     @BeforeClass
92     public static void setUpClass() throws IOException {
93         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
94         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
95         Cluster.get(system).join(member1Address);
96     }
97
98     @AfterClass
99     public static void tearDownClass() throws IOException {
100         JavaTestKit.shutdownActorSystem(system);
101         system = null;
102     }
103
104     protected ActorSystem getSystem() {
105         return system;
106     }
107
108     @Test
109     public void testWriteTransactionWithSingleShard() throws Exception{
110         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
111             DistributedDataStore dataStore =
112                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
113
114             testWriteTransaction(dataStore, TestModel.TEST_PATH,
115                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
116
117             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
118                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
119
120             cleanup(dataStore);
121         }};
122     }
123
124     @Test
125     public void testWriteTransactionWithMultipleShards() throws Exception{
126         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
127             DistributedDataStore dataStore =
128                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
129
130             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
131             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
132
133             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
134             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
135
136             doCommit(writeTx.ready());
137
138             writeTx = dataStore.newWriteOnlyTransaction();
139
140             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
141             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
142
143             doCommit(writeTx.ready());
144
145             writeTx = dataStore.newWriteOnlyTransaction();
146
147             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
148             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
149             writeTx.write(carPath, car);
150
151             MapEntryNode person = PeopleModel.newPersonEntry("jack");
152             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
153             writeTx.write(personPath, person);
154
155             doCommit(writeTx.ready());
156
157             // Verify the data in the store
158
159             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
160
161             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
162             assertEquals("isPresent", true, optional.isPresent());
163             assertEquals("Data node", car, optional.get());
164
165             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
166             assertEquals("isPresent", true, optional.isPresent());
167             assertEquals("Data node", person, optional.get());
168
169             cleanup(dataStore);
170         }};
171     }
172
173     @Test
174     public void testReadWriteTransactionWithSingleShard() throws Exception{
175         System.setProperty("shard.persistent", "true");
176         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
177             DistributedDataStore dataStore =
178                     setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
179
180             // 1. Create a read-write Tx
181
182             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
183             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
184
185             // 2. Write some data
186
187             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
188             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
189             readWriteTx.write(nodePath, nodeToWrite );
190
191             // 3. Read the data from Tx
192
193             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
194             assertEquals("exists", true, exists);
195
196             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
197             assertEquals("isPresent", true, optional.isPresent());
198             assertEquals("Data node", nodeToWrite, optional.get());
199
200             // 4. Ready the Tx for commit
201
202             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
203
204             // 5. Commit the Tx
205
206             doCommit(cohort);
207
208             // 6. Verify the data in the store
209
210             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
211
212             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
213             assertEquals("isPresent", true, optional.isPresent());
214             assertEquals("Data node", nodeToWrite, optional.get());
215
216             cleanup(dataStore);
217         }};
218     }
219
220     @Test
221     public void testReadWriteTransactionWithMultipleShards() throws Exception{
222         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
223             DistributedDataStore dataStore =
224                     setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
225
226             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
227             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
228
229             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
230             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
231
232             doCommit(readWriteTx.ready());
233
234             readWriteTx = dataStore.newReadWriteTransaction();
235
236             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
237             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
238
239             doCommit(readWriteTx.ready());
240
241             readWriteTx = dataStore.newReadWriteTransaction();
242
243             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
244             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
245             readWriteTx.write(carPath, car);
246
247             MapEntryNode person = PeopleModel.newPersonEntry("jack");
248             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
249             readWriteTx.write(personPath, person);
250
251             Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
252             assertEquals("exists", true, exists);
253
254             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
255             assertEquals("isPresent", true, optional.isPresent());
256             assertEquals("Data node", car, optional.get());
257
258             doCommit(readWriteTx.ready());
259
260             // Verify the data in the store
261
262             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
263
264             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
265             assertEquals("isPresent", true, optional.isPresent());
266             assertEquals("Data node", car, optional.get());
267
268             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
269             assertEquals("isPresent", true, optional.isPresent());
270             assertEquals("Data node", person, optional.get());
271
272             cleanup(dataStore);
273         }};
274     }
275
276     @Test
277     public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
278         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
279             DistributedDataStore dataStore = setupDistributedDataStore(
280                     "testSingleTransactionsWritesInQuickSuccession", "cars-1");
281
282             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
283
284             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
285             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
286             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
287             doCommit(writeTx.ready());
288
289             writeTx = txChain.newWriteOnlyTransaction();
290
291             int nCars = 5;
292             for(int i = 0; i < nCars; i++) {
293                 writeTx.write(CarsModel.newCarPath("car" + i),
294                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
295             }
296
297             doCommit(writeTx.ready());
298
299             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
300                     CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
301             assertEquals("isPresent", true, optional.isPresent());
302             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
303
304             cleanup(dataStore);
305         }};
306     }
307
308     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
309             final boolean writeOnly) throws Exception {
310         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
311             String shardName = "test-1";
312
313             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
314             // initialized until we create and submit the write the Tx.
315             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
316             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
317             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
318
319             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
320
321             // Create the write Tx
322
323             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
324                     dataStore.newReadWriteTransaction();
325             assertNotNull("newReadWriteTransaction returned null", writeTx);
326
327             // Do some modification operations and ready the Tx on a separate thread.
328
329             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
330                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
331                             TestModel.ID_QNAME, 1).build();
332
333             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
334             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
335             final CountDownLatch txReady = new CountDownLatch(1);
336             Thread txThread = new Thread() {
337                 @Override
338                 public void run() {
339                     try {
340                         writeTx.write(TestModel.TEST_PATH,
341                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
342
343                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
344                                 TestModel.OUTER_LIST_QNAME).build());
345
346                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
347                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
348
349                         writeTx.delete(listEntryPath);
350
351                         txCohort.set(writeTx.ready());
352                     } catch(Exception e) {
353                         caughtEx.set(e);
354                         return;
355                     } finally {
356                         txReady.countDown();
357                     }
358                 }
359             };
360
361             txThread.start();
362
363             // Wait for the Tx operations to complete.
364
365             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
366             if(caughtEx.get() != null) {
367                 throw caughtEx.get();
368             }
369
370             assertEquals("Tx ready", true, done);
371
372             // At this point the Tx operations should be waiting for the shard to initialize so
373             // trigger the latch to let the shard recovery to continue.
374
375             blockRecoveryLatch.countDown();
376
377             // Wait for the Tx commit to complete.
378
379             doCommit(txCohort.get());
380
381             // Verify the data in the store
382
383             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
384
385             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
386                     get(5, TimeUnit.SECONDS);
387             assertEquals("isPresent", true, optional.isPresent());
388
389             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
390             assertEquals("isPresent", true, optional.isPresent());
391
392             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
393             assertEquals("isPresent", false, optional.isPresent());
394
395             cleanup(dataStore);
396         }};
397     }
398
399     @Test
400     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
401         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
402         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
403     }
404
405     @Test
406     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
407         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
408     }
409
410     @Test
411     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
412         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
413             String testName = "testTransactionReadsWithShardNotInitiallyReady";
414             String shardName = "test-1";
415
416             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
417             // initialized until we create the Tx.
418             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
419             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
420             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
421
422             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
423
424             // Create the read-write Tx
425
426             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
427             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
428
429             // Do some reads on the Tx on a separate thread.
430
431             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
432                     new AtomicReference<>();
433             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
434                     txReadFuture = new AtomicReference<>();
435             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
436             final CountDownLatch txReadsDone = new CountDownLatch(1);
437             Thread txThread = new Thread() {
438                 @Override
439                 public void run() {
440                     try {
441                         readWriteTx.write(TestModel.TEST_PATH,
442                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
443
444                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
445
446                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
447                     } catch(Exception e) {
448                         caughtEx.set(e);
449                         return;
450                     } finally {
451                         txReadsDone.countDown();
452                     }
453                 }
454             };
455
456             txThread.start();
457
458             // Wait for the Tx operations to complete.
459
460             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
461             if(caughtEx.get() != null) {
462                 throw caughtEx.get();
463             }
464
465             assertEquals("Tx reads done", true, done);
466
467             // At this point the Tx operations should be waiting for the shard to initialize so
468             // trigger the latch to let the shard recovery to continue.
469
470             blockRecoveryLatch.countDown();
471
472             // Wait for the reads to complete and verify.
473
474             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
475             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
476
477             readWriteTx.close();
478
479             cleanup(dataStore);
480         }};
481     }
482
483     @Test(expected=NotInitializedException.class)
484     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
485         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
486             String testName = "testTransactionCommitFailureWithShardNotInitialized";
487             String shardName = "test-1";
488
489             // Set the shard initialization timeout low for the test.
490
491             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
492
493             // Setup the InMemoryJournal to block shard recovery indefinitely.
494
495             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
496             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
497             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
498
499             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
500
501             // Create the write Tx
502
503             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
504             assertNotNull("newReadWriteTransaction returned null", writeTx);
505
506             // Do some modifications and ready the Tx on a separate thread.
507
508             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
509             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
510             final CountDownLatch txReady = new CountDownLatch(1);
511             Thread txThread = new Thread() {
512                 @Override
513                 public void run() {
514                     try {
515                         writeTx.write(TestModel.TEST_PATH,
516                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
517
518                         txCohort.set(writeTx.ready());
519                     } catch(Exception e) {
520                         caughtEx.set(e);
521                         return;
522                     } finally {
523                         txReady.countDown();
524                     }
525                 }
526             };
527
528             txThread.start();
529
530             // Wait for the Tx operations to complete.
531
532             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
533             if(caughtEx.get() != null) {
534                 throw caughtEx.get();
535             }
536
537             assertEquals("Tx ready", true, done);
538
539             // Wait for the commit to complete. Since the shard never initialized, the Tx should
540             // have timed out and throw an appropriate exception cause.
541
542             try {
543                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
544             } catch(ExecutionException e) {
545                 throw e.getCause();
546             } finally {
547                 blockRecoveryLatch.countDown();
548                 cleanup(dataStore);
549             }
550         }};
551     }
552
553     @Test(expected=NotInitializedException.class)
554     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
555         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
556             String testName = "testTransactionReadFailureWithShardNotInitialized";
557             String shardName = "test-1";
558
559             // Set the shard initialization timeout low for the test.
560
561             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
562
563             // Setup the InMemoryJournal to block shard recovery indefinitely.
564
565             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
566             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
567             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
568
569             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
570
571             // Create the read-write Tx
572
573             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
574             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
575
576             // Do a read on the Tx on a separate thread.
577
578             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
579                     txReadFuture = new AtomicReference<>();
580             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
581             final CountDownLatch txReadDone = new CountDownLatch(1);
582             Thread txThread = new Thread() {
583                 @Override
584                 public void run() {
585                     try {
586                         readWriteTx.write(TestModel.TEST_PATH,
587                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
588
589                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
590
591                         readWriteTx.close();
592                     } catch(Exception e) {
593                         caughtEx.set(e);
594                         return;
595                     } finally {
596                         txReadDone.countDown();
597                     }
598                 }
599             };
600
601             txThread.start();
602
603             // Wait for the Tx operations to complete.
604
605             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
606             if(caughtEx.get() != null) {
607                 throw caughtEx.get();
608             }
609
610             assertEquals("Tx read done", true, done);
611
612             // Wait for the read to complete. Since the shard never initialized, the Tx should
613             // have timed out and throw an appropriate exception cause.
614
615             try {
616                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
617             } catch(ReadFailedException e) {
618                 throw e.getCause();
619             } finally {
620                 blockRecoveryLatch.countDown();
621                 cleanup(dataStore);
622             }
623         }};
624     }
625
626     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) throws Throwable {
627         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
628             String shardName = "default";
629
630             // We don't want the shard to become the leader so prevent shard elections.
631             datastoreContextBuilder.customRaftPolicyImplementation(
632                     "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
633
634             // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
635             datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1).
636                     shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
637
638             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
639
640             Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
641                     new FindLocalShard(shardName, true));
642             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
643
644             // Create the write Tx.
645
646             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
647                 dataStore.newReadWriteTransaction();
648             assertNotNull("newReadWriteTransaction returned null", writeTx);
649
650             // Do some modifications and ready the Tx on a separate thread.
651
652             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
653             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
654             final CountDownLatch txReady = new CountDownLatch(1);
655             Thread txThread = new Thread() {
656                 @Override
657                 public void run() {
658                     try {
659                         writeTx.write(TestModel.JUNK_PATH,
660                                 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
661
662                         txCohort.set(writeTx.ready());
663                     } catch(Exception e) {
664                         caughtEx.set(e);
665                         return;
666                     } finally {
667                         txReady.countDown();
668                     }
669                 }
670             };
671
672             txThread.start();
673
674             // Wait for the Tx operations to complete.
675
676             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
677             if(caughtEx.get() != null) {
678                 throw caughtEx.get();
679             }
680
681             assertEquals("Tx ready", true, done);
682
683             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
684             // should have timed out and throw an appropriate exception cause.
685
686             try {
687                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
688             } catch(ExecutionException e) {
689                 throw e.getCause();
690             } finally {
691                 cleanup(dataStore);
692             }
693         }};
694     }
695
696     @Test(expected=NoShardLeaderException.class)
697     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
698         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
699         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
700     }
701
702     @Test(expected=NoShardLeaderException.class)
703     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
704         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
705     }
706
707     @Test
708     public void testTransactionAbort() throws Exception{
709         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
710             DistributedDataStore dataStore =
711                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
712
713             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
714             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
715
716             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
717
718             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
719
720             cohort.canCommit().get(5, TimeUnit.SECONDS);
721
722             cohort.abort().get(5, TimeUnit.SECONDS);
723
724             testWriteTransaction(dataStore, TestModel.TEST_PATH,
725                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
726
727             cleanup(dataStore);
728         }};
729     }
730
731     @Test
732     public void testTransactionChainWithSingleShard() throws Exception{
733         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
734             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
735
736             // 1. Create a Tx chain and write-only Tx
737
738             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
739
740             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
741             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
742
743             // 2. Write some data
744
745             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
746             writeTx.write(TestModel.TEST_PATH, testNode);
747
748             // 3. Ready the Tx for commit
749
750             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
751
752             // 4. Commit the Tx on another thread that first waits for the second read Tx.
753
754             final CountDownLatch continueCommit1 = new CountDownLatch(1);
755             final CountDownLatch commit1Done = new CountDownLatch(1);
756             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
757             new Thread() {
758                 @Override
759                 public void run() {
760                     try {
761                         continueCommit1.await();
762                         doCommit(cohort1);
763                     } catch (Exception e) {
764                         commit1Error.set(e);
765                     } finally {
766                         commit1Done.countDown();
767                     }
768                 }
769             }.start();
770
771             // 5. Create a new read Tx from the chain to read and verify the data from the first
772             // Tx is visible after being readied.
773
774             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
775             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
776             assertEquals("isPresent", true, optional.isPresent());
777             assertEquals("Data node", testNode, optional.get());
778
779             // 6. Create a new RW Tx from the chain, write more data, and ready it
780
781             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
782             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
783             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
784
785             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
786
787             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
788             // verify it is visible.
789
790             readTx = txChain.newReadWriteTransaction();
791             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
792             assertEquals("isPresent", true, optional.isPresent());
793             assertEquals("Data node", outerNode, optional.get());
794
795             // 8. Wait for the 2 commits to complete and close the chain.
796
797             continueCommit1.countDown();
798             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
799
800             if(commit1Error.get() != null) {
801                 throw commit1Error.get();
802             }
803
804             doCommit(cohort2);
805
806             txChain.close();
807
808             // 9. Create a new read Tx from the data store and verify committed data.
809
810             readTx = dataStore.newReadOnlyTransaction();
811             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
812             assertEquals("isPresent", true, optional.isPresent());
813             assertEquals("Data node", outerNode, optional.get());
814
815             cleanup(dataStore);
816         }};
817     }
818
819     @Test
820     public void testTransactionChainWithMultipleShards() throws Exception{
821         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
822             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
823                     "cars-1", "people-1");
824
825             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
826
827             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
828             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
829
830             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
831             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
832
833             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
834             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
835
836             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
837
838             DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
839
840             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
841             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
842             readWriteTx.write(carPath, car);
843
844             MapEntryNode person = PeopleModel.newPersonEntry("jack");
845             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
846             readWriteTx.merge(personPath, person);
847
848             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
849             assertEquals("isPresent", true, optional.isPresent());
850             assertEquals("Data node", car, optional.get());
851
852             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
853             assertEquals("isPresent", true, optional.isPresent());
854             assertEquals("Data node", person, optional.get());
855
856             DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
857
858             writeTx = txChain.newWriteOnlyTransaction();
859
860             writeTx.delete(carPath);
861
862             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
863
864             ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
865             ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
866
867             doCommit(canCommit1, cohort1);
868             doCommit(canCommit2, cohort2);
869             doCommit(cohort3);
870
871             txChain.close();
872
873             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
874
875             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
876             assertEquals("isPresent", false, optional.isPresent());
877
878             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
879             assertEquals("isPresent", true, optional.isPresent());
880             assertEquals("Data node", person, optional.get());
881
882             cleanup(dataStore);
883         }};
884     }
885
886     @Test
887     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
888         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
889             DistributedDataStore dataStore = setupDistributedDataStore(
890                     "testCreateChainedTransactionsInQuickSuccession", "cars-1");
891
892             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
893                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
894                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
895
896             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
897             DOMTransactionChain txChain = broker.createTransactionChain(listener);
898
899             List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
900
901             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
902             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
903             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
904             futures.add(writeTx.submit());
905
906             int nCars = 100;
907             for(int i = 0; i < nCars; i++) {
908                 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
909
910                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
911                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
912
913                 futures.add(rwTx.submit());
914             }
915
916             for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
917                 f.checkedGet();
918             }
919
920             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
921                     LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
922             assertEquals("isPresent", true, optional.isPresent());
923             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
924
925             txChain.close();
926
927             broker.close();
928
929             cleanup(dataStore);
930         }};
931     }
932
933     @Test
934     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
935         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
936             DistributedDataStore dataStore = setupDistributedDataStore(
937                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
938
939             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
940
941             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
942
943             rwTx1.ready();
944
945             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
946
947             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
948             assertEquals("isPresent", false, optional.isPresent());
949
950             txChain.close();
951
952             cleanup(dataStore);
953         }};
954     }
955
956     @Test
957     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
958         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
959             DistributedDataStore dataStore = setupDistributedDataStore(
960                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
961
962             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
963
964             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
965             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
966
967             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
968
969             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
970             // readied.
971
972             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
973         }};
974     }
975
976     @Test
977     public void testCreateChainedTransactionAfterClose() throws Throwable {
978         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
979             DistributedDataStore dataStore = setupDistributedDataStore(
980                     "testCreateChainedTransactionAfterClose", "test-1");
981
982             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
983
984             txChain.close();
985
986             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
987
988             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
989         }};
990     }
991
992     @Test
993     public void testChainedTransactionFailureWithSingleShard() throws Exception{
994         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
995             DistributedDataStore dataStore = setupDistributedDataStore(
996                     "testChainedTransactionFailureWithSingleShard", "cars-1");
997
998             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
999                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1000                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1001
1002             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1003             DOMTransactionChain txChain = broker.createTransactionChain(listener);
1004
1005             DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1006
1007             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1008                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1009                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1010
1011             rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1012
1013             try {
1014                 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1015                 fail("Expected TransactionCommitFailedException");
1016             } catch (TransactionCommitFailedException e) {
1017                 // Expected
1018             }
1019
1020             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1021
1022             txChain.close();
1023             broker.close();
1024             cleanup(dataStore);
1025         }};
1026     }
1027
1028     @Test
1029     public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1030         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1031             DistributedDataStore dataStore = setupDistributedDataStore(
1032                     "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1033
1034             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1035                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1036                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1037
1038             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1039             DOMTransactionChain txChain = broker.createTransactionChain(listener);
1040
1041             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1042
1043             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1044
1045             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1046                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1047                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1048
1049             // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1050             // done for put for performance reasons.
1051             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1052
1053             try {
1054                 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1055                 fail("Expected TransactionCommitFailedException");
1056             } catch (TransactionCommitFailedException e) {
1057                 // Expected
1058             }
1059
1060             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1061
1062             txChain.close();
1063             broker.close();
1064             cleanup(dataStore);
1065         }};
1066     }
1067
1068     @Test
1069     public void testChangeListenerRegistration() throws Exception{
1070         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1071             DistributedDataStore dataStore =
1072                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1073
1074             testWriteTransaction(dataStore, TestModel.TEST_PATH,
1075                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1076
1077             MockDataChangeListener listener = new MockDataChangeListener(1);
1078
1079             ListenerRegistration<MockDataChangeListener>
1080                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1081                             DataChangeScope.SUBTREE);
1082
1083             assertNotNull("registerChangeListener returned null", listenerReg);
1084
1085             // Wait for the initial notification
1086
1087             listener.waitForChangeEvents(TestModel.TEST_PATH);
1088
1089             listener.reset(2);
1090
1091             // Write 2 updates.
1092
1093             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1094                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1095
1096             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1097                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1098             testWriteTransaction(dataStore, listPath,
1099                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1100
1101             // Wait for the 2 updates.
1102
1103             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1104
1105             listenerReg.close();
1106
1107             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1108                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1109                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1110
1111             listener.expectNoMoreChanges("Received unexpected change after close");
1112
1113             cleanup(dataStore);
1114         }};
1115     }
1116
1117     @Test
1118     public void testRestoreFromDatastoreSnapshot() throws Exception{
1119         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1120             String name = "transactionIntegrationTest";
1121
1122             ContainerNode carsNode = CarsModel.newCarsNode(CarsModel.newCarsMapNode(
1123                     CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1124                     CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1125
1126             ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full());
1127             AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1128             NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
1129                     YangInstanceIdentifier.builder().build());
1130
1131             Snapshot carsSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
1132                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1133
1134             NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1135             dataTree = new ShardDataTree(SchemaContextHelper.full());
1136             AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1137             root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.builder().build());
1138
1139             Snapshot peopleSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
1140                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1141
1142             restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1143                     new DatastoreSnapshot.ShardSnapshot("cars",
1144                             org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
1145                     new DatastoreSnapshot.ShardSnapshot("people",
1146                             org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
1147
1148             DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1149                     true, "cars", "people");
1150
1151             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1152
1153             Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1154             assertEquals("isPresent", true, optional.isPresent());
1155             assertEquals("Data node", carsNode, optional.get());
1156
1157             optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1158             assertEquals("isPresent", true, optional.isPresent());
1159             assertEquals("Data node", peopleNode, optional.get());
1160
1161             cleanup(dataStore);
1162         }};
1163     }
1164 }