5bd67b4ed45ca95ea63a2da9e99292be9898faea
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.CheckedFuture;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.typesafe.config.ConfigFactory;
33 import java.io.ByteArrayOutputStream;
34 import java.io.IOException;
35 import java.math.BigInteger;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.List;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.AfterClass;
46 import org.junit.BeforeClass;
47 import org.junit.Test;
48 import org.mockito.Mockito;
49 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
55 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
56 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
57 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
58 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
59 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
60 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
61 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
62 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
63 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
64 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
65 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
66 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
67 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
68 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
69 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
70 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
71 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
72 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
73 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
74 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
77 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
78 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
79 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
80 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
81 import org.opendaylight.yangtools.concepts.ListenerRegistration;
82 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
83 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
84 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
85 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
86 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
87 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
88 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
89 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
90 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
91 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
92
93 public class DistributedDataStoreIntegrationTest {
94
95     private static ActorSystem system;
96
97     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
98             .shardHeartbeatIntervalInMillis(100);
99
100     @BeforeClass
101     public static void setUpClass() throws IOException {
102         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
103         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
104         Cluster.get(system).join(member1Address);
105     }
106
107     @AfterClass
108     public static void tearDownClass() throws IOException {
109         JavaTestKit.shutdownActorSystem(system);
110         system = null;
111     }
112
113     protected ActorSystem getSystem() {
114         return system;
115     }
116
117     @Test
118     public void testWriteTransactionWithSingleShard() throws Exception {
119         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
120             {
121                 try (AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
122                         "test-1")) {
123
124                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
125                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
126
127                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
128                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
129                 }
130             }
131         };
132     }
133
134     @Test
135     public void testWriteTransactionWithMultipleShards() throws Exception {
136         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
137             {
138                 try (AbstractDataStore dataStore = setupDistributedDataStore(
139                         "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
140
141                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
142                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
143
144                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
145                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
146
147                     doCommit(writeTx.ready());
148
149                     writeTx = dataStore.newWriteOnlyTransaction();
150
151                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
152                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
153
154                     doCommit(writeTx.ready());
155
156                     writeTx = dataStore.newWriteOnlyTransaction();
157
158                     MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
159                     YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
160                     writeTx.write(carPath, car);
161
162                     MapEntryNode person = PeopleModel.newPersonEntry("jack");
163                     YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
164                     writeTx.write(personPath, person);
165
166                     doCommit(writeTx.ready());
167
168                     // Verify the data in the store
169
170                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
171
172                     Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
173                     assertEquals("isPresent", true, optional.isPresent());
174                     assertEquals("Data node", car, optional.get());
175
176                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
177                     assertEquals("isPresent", true, optional.isPresent());
178                     assertEquals("Data node", person, optional.get());
179                 }
180             }
181         };
182     }
183
184     @Test
185     public void testReadWriteTransactionWithSingleShard() throws Exception {
186         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
187             {
188                 try (AbstractDataStore dataStore = setupDistributedDataStore(
189                         "testReadWriteTransactionWithSingleShard", "test-1")) {
190
191                     // 1. Create a read-write Tx
192
193                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
194                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
195
196                     // 2. Write some data
197
198                     YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
199                     NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
200                     readWriteTx.write(nodePath, nodeToWrite);
201
202                     // 3. Read the data from Tx
203
204                     Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
205                     assertEquals("exists", true, exists);
206
207                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
208                     assertEquals("isPresent", true, optional.isPresent());
209                     assertEquals("Data node", nodeToWrite, optional.get());
210
211                     // 4. Ready the Tx for commit
212
213                     DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
214
215                     // 5. Commit the Tx
216
217                     doCommit(cohort);
218
219                     // 6. Verify the data in the store
220
221                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
222
223                     optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
224                     assertEquals("isPresent", true, optional.isPresent());
225                     assertEquals("Data node", nodeToWrite, optional.get());
226                 }
227             }
228         };
229     }
230
231     @Test
232     public void testReadWriteTransactionWithMultipleShards() throws Exception {
233         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
234             {
235                 try (AbstractDataStore dataStore = setupDistributedDataStore(
236                         "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
237
238                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
239                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
240
241                     readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
242                     readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
243
244                     doCommit(readWriteTx.ready());
245
246                     readWriteTx = dataStore.newReadWriteTransaction();
247
248                     readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
249                     readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
250
251                     doCommit(readWriteTx.ready());
252
253                     readWriteTx = dataStore.newReadWriteTransaction();
254
255                     MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
256                     YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
257                     readWriteTx.write(carPath, car);
258
259                     MapEntryNode person = PeopleModel.newPersonEntry("jack");
260                     YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
261                     readWriteTx.write(personPath, person);
262
263                     Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
264                     assertEquals("exists", true, exists);
265
266                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
267                     assertEquals("isPresent", true, optional.isPresent());
268                     assertEquals("Data node", car, optional.get());
269
270                     doCommit(readWriteTx.ready());
271
272                     // Verify the data in the store
273
274                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
275
276                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
277                     assertEquals("isPresent", true, optional.isPresent());
278                     assertEquals("Data node", car, optional.get());
279
280                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
281                     assertEquals("isPresent", true, optional.isPresent());
282                     assertEquals("Data node", person, optional.get());
283
284                 }
285             }
286         };
287     }
288
289     @Test
290     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
291         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
292             {
293                 try (AbstractDataStore dataStore = setupDistributedDataStore(
294                         "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
295
296                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
297
298                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
299                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
300                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
301                     doCommit(writeTx.ready());
302
303                     writeTx = txChain.newWriteOnlyTransaction();
304
305                     int numCars = 5;
306                     for (int i = 0; i < numCars; i++) {
307                         writeTx.write(CarsModel.newCarPath("car" + i),
308                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
309                     }
310
311                     doCommit(writeTx.ready());
312
313                     Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
314                             .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
315                     assertEquals("isPresent", true, optional.isPresent());
316                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
317                 }
318             }
319         };
320     }
321
322     @SuppressWarnings("checkstyle:IllegalCatch")
323     private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
324             throws Exception {
325         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
326             {
327                 String shardName = "test-1";
328
329                 // Setup the InMemoryJournal to block shard recovery to ensure
330                 // the shard isn't
331                 // initialized until we create and submit the write the Tx.
332                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
333                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
334                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
335
336                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
337
338                     // Create the write Tx
339
340                     final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
341                             : dataStore.newReadWriteTransaction();
342                     assertNotNull("newReadWriteTransaction returned null", writeTx);
343
344                     // Do some modification operations and ready the Tx on a
345                     // separate thread.
346
347                     final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
348                             .builder(TestModel.OUTER_LIST_PATH)
349                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
350
351                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
352                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
353                     final CountDownLatch txReady = new CountDownLatch(1);
354                     Thread txThread = new Thread() {
355                         @Override
356                         public void run() {
357                             try {
358                                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
359
360                                 writeTx.merge(TestModel.OUTER_LIST_PATH,
361                                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
362
363                                 writeTx.write(listEntryPath,
364                                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
365
366                                 writeTx.delete(listEntryPath);
367
368                                 txCohort.set(writeTx.ready());
369                             } catch (Exception e) {
370                                 caughtEx.set(e);
371                                 return;
372                             } finally {
373                                 txReady.countDown();
374                             }
375                         }
376                     };
377
378                     txThread.start();
379
380                     // Wait for the Tx operations to complete.
381
382                     boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
383                     if (caughtEx.get() != null) {
384                         throw caughtEx.get();
385                     }
386
387                     assertEquals("Tx ready", true, done);
388
389                     // At this point the Tx operations should be waiting for the
390                     // shard to initialize so
391                     // trigger the latch to let the shard recovery to continue.
392
393                     blockRecoveryLatch.countDown();
394
395                     // Wait for the Tx commit to complete.
396
397                     doCommit(txCohort.get());
398
399                     // Verify the data in the store
400
401                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
402
403                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
404                     assertEquals("isPresent", true, optional.isPresent());
405
406                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
407                     assertEquals("isPresent", true, optional.isPresent());
408
409                     optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
410                     assertEquals("isPresent", false, optional.isPresent());
411                 }
412             }
413         };
414     }
415
416     @Test
417     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
418         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
419         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
420     }
421
422     @Test
423     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
424         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
425     }
426
427     @Test
428     @SuppressWarnings("checkstyle:IllegalCatch")
429     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
430         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
431             {
432                 String testName = "testTransactionReadsWithShardNotInitiallyReady";
433                 String shardName = "test-1";
434
435                 // Setup the InMemoryJournal to block shard recovery to ensure
436                 // the shard isn't
437                 // initialized until we create the Tx.
438                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
439                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
440                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
441
442                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
443
444                     // Create the read-write Tx
445
446                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
447                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
448
449                     // Do some reads on the Tx on a separate thread.
450
451                     final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
452                             new AtomicReference<>();
453                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
454                             txReadFuture = new AtomicReference<>();
455                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
456                     final CountDownLatch txReadsDone = new CountDownLatch(1);
457                     Thread txThread = new Thread() {
458                         @Override
459                         public void run() {
460                             try {
461                                 readWriteTx.write(TestModel.TEST_PATH,
462                                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
463
464                                 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
465
466                                 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
467                             } catch (Exception e) {
468                                 caughtEx.set(e);
469                                 return;
470                             } finally {
471                                 txReadsDone.countDown();
472                             }
473                         }
474                     };
475
476                     txThread.start();
477
478                     // Wait for the Tx operations to complete.
479
480                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
481                     if (caughtEx.get() != null) {
482                         throw caughtEx.get();
483                     }
484
485                     assertEquals("Tx reads done", true, done);
486
487                     // At this point the Tx operations should be waiting for the
488                     // shard to initialize so
489                     // trigger the latch to let the shard recovery to continue.
490
491                     blockRecoveryLatch.countDown();
492
493                     // Wait for the reads to complete and verify.
494
495                     assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
496                     assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
497
498                     readWriteTx.close();
499                 }
500             }
501         };
502     }
503
504     @Test(expected = NotInitializedException.class)
505     @SuppressWarnings("checkstyle:IllegalCatch")
506     public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
507         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
508             {
509                 String testName = "testTransactionCommitFailureWithShardNotInitialized";
510                 String shardName = "test-1";
511
512                 // Set the shard initialization timeout low for the test.
513
514                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
515
516                 // Setup the InMemoryJournal to block shard recovery
517                 // indefinitely.
518
519                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
520                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
521                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
522
523                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
524
525                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
526
527                     // Create the write Tx
528
529                     final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
530                     assertNotNull("newReadWriteTransaction returned null", writeTx);
531
532                     // Do some modifications and ready the Tx on a separate
533                     // thread.
534
535                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
536                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
537                     final CountDownLatch txReady = new CountDownLatch(1);
538                     Thread txThread = new Thread() {
539                         @Override
540                         public void run() {
541                             try {
542                                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
543
544                                 txCohort.set(writeTx.ready());
545                             } catch (Exception e) {
546                                 caughtEx.set(e);
547                                 return;
548                             } finally {
549                                 txReady.countDown();
550                             }
551                         }
552                     };
553
554                     txThread.start();
555
556                     // Wait for the Tx operations to complete.
557
558                     boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
559                     if (caughtEx.get() != null) {
560                         throw caughtEx.get();
561                     }
562
563                     assertEquals("Tx ready", true, done);
564
565                     // Wait for the commit to complete. Since the shard never
566                     // initialized, the Tx should
567                     // have timed out and throw an appropriate exception cause.
568
569                     try {
570                         txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
571                     } catch (ExecutionException e) {
572                         Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
573                         Throwables.propagate(e.getCause());
574                     } finally {
575                         blockRecoveryLatch.countDown();
576                     }
577                 }
578             }
579         };
580     }
581
582     @Test(expected = NotInitializedException.class)
583     @SuppressWarnings("checkstyle:IllegalCatch")
584     public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
585         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
586             {
587                 String testName = "testTransactionReadFailureWithShardNotInitialized";
588                 String shardName = "test-1";
589
590                 // Set the shard initialization timeout low for the test.
591
592                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
593
594                 // Setup the InMemoryJournal to block shard recovery
595                 // indefinitely.
596
597                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
598                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
599                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
600
601                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
602
603                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
604
605                     // Create the read-write Tx
606
607                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
608                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
609
610                     // Do a read on the Tx on a separate thread.
611
612                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
613                             txReadFuture = new AtomicReference<>();
614                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
615                     final CountDownLatch txReadDone = new CountDownLatch(1);
616                     Thread txThread = new Thread() {
617                         @Override
618                         public void run() {
619                             try {
620                                 readWriteTx.write(TestModel.TEST_PATH,
621                                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
622
623                                 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
624
625                                 readWriteTx.close();
626                             } catch (Exception e) {
627                                 caughtEx.set(e);
628                                 return;
629                             } finally {
630                                 txReadDone.countDown();
631                             }
632                         }
633                     };
634
635                     txThread.start();
636
637                     // Wait for the Tx operations to complete.
638
639                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
640                     if (caughtEx.get() != null) {
641                         throw caughtEx.get();
642                     }
643
644                     assertEquals("Tx read done", true, done);
645
646                     // Wait for the read to complete. Since the shard never
647                     // initialized, the Tx should
648                     // have timed out and throw an appropriate exception cause.
649
650                     try {
651                         txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
652                     } catch (ReadFailedException e) {
653                         Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
654                         Throwables.propagate(e.getCause());
655                     } finally {
656                         blockRecoveryLatch.countDown();
657                     }
658                 }
659             }
660         };
661     }
662
663     @SuppressWarnings("checkstyle:IllegalCatch")
664     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
665             throws Exception {
666         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
667             {
668                 String shardName = "default";
669
670                 // We don't want the shard to become the leader so prevent shard
671                 // elections.
672                 datastoreContextBuilder.customRaftPolicyImplementation(
673                         "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
674
675                 // The ShardManager uses the election timeout for FindPrimary so
676                 // reset it low so it will timeout quickly.
677                 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
678                         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
679
680                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
681
682                     Object result = dataStore.getActorContext().executeOperation(
683                             dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
684                     assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
685
686                     // Create the write Tx.
687
688                     try (DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
689                             : dataStore.newReadWriteTransaction()) {
690                         assertNotNull("newReadWriteTransaction returned null", writeTx);
691
692                         // Do some modifications and ready the Tx on a separate
693                         // thread.
694
695                         final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
696                         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
697                         final CountDownLatch txReady = new CountDownLatch(1);
698                         Thread txThread = new Thread() {
699                             @Override
700                             public void run() {
701                                 try {
702                                     writeTx.write(TestModel.JUNK_PATH,
703                                             ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
704
705                                     txCohort.set(writeTx.ready());
706                                 } catch (Exception e) {
707                                     caughtEx.set(e);
708                                     return;
709                                 } finally {
710                                     txReady.countDown();
711                                 }
712                             }
713                         };
714
715                         txThread.start();
716
717                         // Wait for the Tx operations to complete.
718
719                         boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
720                         if (caughtEx.get() != null) {
721                             throw caughtEx.get();
722                         }
723
724                         assertEquals("Tx ready", true, done);
725
726                         // Wait for the commit to complete. Since no shard
727                         // leader was elected in time, the Tx
728                         // should have timed out and throw an appropriate
729                         // exception cause.
730
731                         try {
732                             txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
733                         } catch (ExecutionException e) {
734                             Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
735                             Throwables.propagate(e.getCause());
736                         }
737                     }
738                 }
739             }
740         };
741     }
742
743     @Test(expected = NoShardLeaderException.class)
744     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
745         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
746         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
747     }
748
749     @Test(expected = NoShardLeaderException.class)
750     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
751         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
752     }
753
754     @Test
755     public void testTransactionAbort() throws Exception {
756         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
757             {
758                 try (AbstractDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest",
759                         "test-1")) {
760
761                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
762                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
763
764                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
765
766                     DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
767
768                     cohort.canCommit().get(5, TimeUnit.SECONDS);
769
770                     cohort.abort().get(5, TimeUnit.SECONDS);
771
772                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
773                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
774                 }
775             }
776         };
777     }
778
779     @Test
780     @SuppressWarnings("checkstyle:IllegalCatch")
781     public void testTransactionChainWithSingleShard() throws Exception {
782         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
783             {
784                 try (AbstractDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard",
785                         "test-1")) {
786
787                     // 1. Create a Tx chain and write-only Tx
788
789                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
790
791                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
792                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
793
794                     // 2. Write some data
795
796                     NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
797                     writeTx.write(TestModel.TEST_PATH, testNode);
798
799                     // 3. Ready the Tx for commit
800
801                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
802
803                     // 4. Commit the Tx on another thread that first waits for
804                     // the second read Tx.
805
806                     final CountDownLatch continueCommit1 = new CountDownLatch(1);
807                     final CountDownLatch commit1Done = new CountDownLatch(1);
808                     final AtomicReference<Exception> commit1Error = new AtomicReference<>();
809                     new Thread() {
810                         @Override
811                         public void run() {
812                             try {
813                                 continueCommit1.await();
814                                 doCommit(cohort1);
815                             } catch (Exception e) {
816                                 commit1Error.set(e);
817                             } finally {
818                                 commit1Done.countDown();
819                             }
820                         }
821                     }.start();
822
823                     // 5. Create a new read Tx from the chain to read and verify
824                     // the data from the first
825                     // Tx is visible after being readied.
826
827                     DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
828                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
829                     assertEquals("isPresent", true, optional.isPresent());
830                     assertEquals("Data node", testNode, optional.get());
831
832                     // 6. Create a new RW Tx from the chain, write more data,
833                     // and ready it
834
835                     DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
836                     MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
837                     rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
838
839                     final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
840
841                     // 7. Create a new read Tx from the chain to read the data
842                     // from the last RW Tx to
843                     // verify it is visible.
844
845                     readTx = txChain.newReadWriteTransaction();
846                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
847                     assertEquals("isPresent", true, optional.isPresent());
848                     assertEquals("Data node", outerNode, optional.get());
849
850                     // 8. Wait for the 2 commits to complete and close the
851                     // chain.
852
853                     continueCommit1.countDown();
854                     Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
855
856                     if (commit1Error.get() != null) {
857                         throw commit1Error.get();
858                     }
859
860                     doCommit(cohort2);
861
862                     txChain.close();
863
864                     // 9. Create a new read Tx from the data store and verify
865                     // committed data.
866
867                     readTx = dataStore.newReadOnlyTransaction();
868                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
869                     assertEquals("isPresent", true, optional.isPresent());
870                     assertEquals("Data node", outerNode, optional.get());
871                 }
872             }
873         };
874     }
875
876     @Test
877     public void testTransactionChainWithMultipleShards() throws Exception {
878         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
879             {
880                 try (AbstractDataStore dataStore = setupDistributedDataStore(
881                         "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
882
883                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
884
885                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
886                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
887
888                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
889                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
890
891                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
892                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
893
894                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
895
896                     DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
897
898                     MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
899                     YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
900                     readWriteTx.write(carPath, car);
901
902                     MapEntryNode person = PeopleModel.newPersonEntry("jack");
903                     YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
904                     readWriteTx.merge(personPath, person);
905
906                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
907                     assertEquals("isPresent", true, optional.isPresent());
908                     assertEquals("Data node", car, optional.get());
909
910                     optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
911                     assertEquals("isPresent", true, optional.isPresent());
912                     assertEquals("Data node", person, optional.get());
913
914                     DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
915
916                     writeTx = txChain.newWriteOnlyTransaction();
917
918                     writeTx.delete(carPath);
919
920                     DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
921
922                     ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
923                     ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
924
925                     doCommit(canCommit1, cohort1);
926                     doCommit(canCommit2, cohort2);
927                     doCommit(cohort3);
928
929                     txChain.close();
930
931                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
932
933                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
934                     assertEquals("isPresent", false, optional.isPresent());
935
936                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
937                     assertEquals("isPresent", true, optional.isPresent());
938                     assertEquals("Data node", person, optional.get());
939                 }
940             }
941         };
942     }
943
944     @Test
945     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
946         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
947             {
948                 try (AbstractDataStore dataStore = setupDistributedDataStore(
949                         "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
950
951                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
952                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
953                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
954                             MoreExecutors.directExecutor());
955
956                     TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
957                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
958
959                     List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
960
961                     DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
962                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
963                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
964                     futures.add(writeTx.submit());
965
966                     int numCars = 100;
967                     for (int i = 0; i < numCars; i++) {
968                         DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
969
970                         rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
971                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
972
973                         futures.add(rwTx.submit());
974                     }
975
976                     for (CheckedFuture<Void, TransactionCommitFailedException> f : futures) {
977                         f.checkedGet();
978                     }
979
980                     Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
981                             .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
982                     assertEquals("isPresent", true, optional.isPresent());
983                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
984
985                     txChain.close();
986
987                     broker.close();
988                 }
989             }
990         };
991     }
992
993     @Test
994     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
995         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
996             {
997                 try (AbstractDataStore dataStore = setupDistributedDataStore(
998                         "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
999
1000                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1001
1002                     DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
1003
1004                     rwTx1.ready();
1005
1006                     DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
1007
1008                     Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1009                     assertEquals("isPresent", false, optional.isPresent());
1010
1011                     txChain.close();
1012                 }
1013             }
1014         };
1015     }
1016
1017     @Test
1018     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
1019         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1020             {
1021                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1022                         "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
1023
1024                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1025
1026                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1027                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
1028
1029                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1030
1031                     // Try to create another Tx of each type - each should fail
1032                     // b/c the previous Tx wasn't
1033                     // readied.
1034
1035                     assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
1036                 }
1037             }
1038         };
1039     }
1040
1041     @Test
1042     public void testCreateChainedTransactionAfterClose() throws Exception {
1043         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1044             {
1045                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1046                         "testCreateChainedTransactionAfterClose", "test-1")) {
1047
1048                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1049
1050                     txChain.close();
1051
1052                     // Try to create another Tx of each type - should fail b/c
1053                     // the previous Tx was closed.
1054
1055                     assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
1056                 }
1057             }
1058         };
1059     }
1060
1061     @Test
1062     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
1063         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1064             {
1065                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1066                         "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
1067
1068                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1069
1070                     // Create a write tx and submit.
1071
1072                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1073                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1074                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1075
1076                     // Create read-only tx's and issue a read.
1077
1078                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 = txChain
1079                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1080
1081                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 = txChain
1082                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1083
1084                     // Create another write tx and issue the write.
1085
1086                     DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1087                     writeTx2.write(TestModel.OUTER_LIST_PATH,
1088                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1089
1090                     // Ensure the reads succeed.
1091
1092                     assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1093                     assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1094
1095                     // Ensure the writes succeed.
1096
1097                     DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1098
1099                     doCommit(cohort1);
1100                     doCommit(cohort2);
1101
1102                     assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
1103                             .checkedGet(5, TimeUnit.SECONDS).isPresent());
1104                 }
1105             }
1106         };
1107     }
1108
1109     @Test
1110     public void testChainedTransactionFailureWithSingleShard() throws Exception {
1111         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1112             {
1113                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1114                         "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1115
1116                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1117                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1118                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1119                             MoreExecutors.directExecutor());
1120
1121                     TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1122                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
1123
1124                     DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1125
1126                     ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1127                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1128                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1129
1130                     rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1131
1132                     try {
1133                         rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1134                         fail("Expected TransactionCommitFailedException");
1135                     } catch (TransactionCommitFailedException e) {
1136                         // Expected
1137                     }
1138
1139                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx),
1140                             any(Throwable.class));
1141
1142                     txChain.close();
1143                     broker.close();
1144                 }
1145             }
1146         };
1147     }
1148
1149     @Test
1150     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
1151         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1152             {
1153                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1154                         "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1155
1156                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1157                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1158                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1159                             MoreExecutors.directExecutor());
1160
1161                     TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1162                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
1163
1164                     DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1165
1166                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1167                             PeopleModel.emptyContainer());
1168
1169                     ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1170                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1171                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1172
1173                     // Note that merge will validate the data and fail but put
1174                     // succeeds b/c deep validation is not
1175                     // done for put for performance reasons.
1176                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1177
1178                     try {
1179                         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1180                         fail("Expected TransactionCommitFailedException");
1181                     } catch (TransactionCommitFailedException e) {
1182                         // Expected
1183                     }
1184
1185                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1186                             any(Throwable.class));
1187
1188                     txChain.close();
1189                     broker.close();
1190                 }
1191             }
1192         };
1193     }
1194
1195     @Test
1196     public void testChangeListenerRegistration() throws Exception {
1197         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1198             {
1199                 try (AbstractDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration",
1200                         "test-1")) {
1201
1202                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
1203                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1204
1205                     MockDataChangeListener listener = new MockDataChangeListener(1);
1206
1207                     ListenerRegistration<MockDataChangeListener> listenerReg = dataStore
1208                             .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE);
1209
1210                     assertNotNull("registerChangeListener returned null", listenerReg);
1211
1212                     // Wait for the initial notification
1213
1214                     listener.waitForChangeEvents(TestModel.TEST_PATH);
1215
1216                     listener.reset(2);
1217
1218                     // Write 2 updates.
1219
1220                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1221                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1222
1223                     YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1224                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1225                     testWriteTransaction(dataStore, listPath,
1226                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1227
1228                     // Wait for the 2 updates.
1229
1230                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1231
1232                     listenerReg.close();
1233
1234                     testWriteTransaction(dataStore,
1235                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1236                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1237                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1238
1239                     listener.expectNoMoreChanges("Received unexpected change after close");
1240                 }
1241             }
1242         };
1243     }
1244
1245     @Test
1246     public void testRestoreFromDatastoreSnapshot() throws Exception {
1247         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1248             {
1249                 final String name = "transactionIntegrationTest";
1250
1251                 ContainerNode carsNode = CarsModel.newCarsNode(
1252                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1253                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1254
1255                 DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1256                 dataTree.setSchemaContext(SchemaContextHelper.full());
1257                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1258                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1259
1260                 final Snapshot carsSnapshot = Snapshot.create(
1261                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1262                         Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1263
1264                 NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1265                 dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1266                 dataTree.setSchemaContext(SchemaContextHelper.full());
1267                 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1268                 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1269
1270                 Snapshot peopleSnapshot = Snapshot.create(
1271                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1272                         Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1273
1274                 restoreFromSnapshot = new DatastoreSnapshot(name, null,
1275                         Arrays.asList(
1276                                 new DatastoreSnapshot.ShardSnapshot("cars",
1277                                         org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
1278                                 new DatastoreSnapshot.ShardSnapshot("people",
1279                                         org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
1280
1281                 try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1282                         true, "cars", "people")) {
1283
1284                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1285
1286                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1287                     assertEquals("isPresent", true, optional.isPresent());
1288                     assertEquals("Data node", carsNode, optional.get());
1289
1290                     optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1291                     assertEquals("isPresent", true, optional.isPresent());
1292                     assertEquals("Data node", peopleNode, optional.get());
1293                 }
1294             }
1295         };
1296     }
1297
1298     @Test
1299     @Deprecated
1300     public void testRecoveryFromPreCarbonSnapshot() throws Exception {
1301         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1302             {
1303                 final String name = "testRecoveryFromPreCarbonSnapshot";
1304
1305                 ContainerNode carsNode = CarsModel.newCarsNode(
1306                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1307                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1308
1309                 DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1310                 dataTree.setSchemaContext(SchemaContextHelper.full());
1311                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1312                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1313
1314                 final ByteArrayOutputStream bos = new ByteArrayOutputStream();
1315                 new MetadataShardDataTreeSnapshot(root).serialize(bos);
1316                 final org.opendaylight.controller.cluster.raft.Snapshot snapshot =
1317                     org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(),
1318                             Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1319
1320                 InMemorySnapshotStore.addSnapshot("member-1-shard-cars-" + name, snapshot);
1321
1322                 try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1323                         true, "cars")) {
1324
1325                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1326
1327                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1328                     assertEquals("isPresent", true, optional.isPresent());
1329                     assertEquals("Data node", carsNode, optional.get());
1330                 }
1331             }
1332         };
1333     }
1334 }