Fix checkstyle problems not detected by the current version
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.CheckedFuture;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.typesafe.config.ConfigFactory;
33 import java.io.ByteArrayOutputStream;
34 import java.io.DataOutputStream;
35 import java.io.IOException;
36 import java.io.ObjectOutputStream;
37 import java.math.BigInteger;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collection;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.After;
48 import org.junit.Before;
49 import org.junit.Test;
50 import org.junit.runner.RunWith;
51 import org.junit.runners.Parameterized;
52 import org.junit.runners.Parameterized.Parameter;
53 import org.junit.runners.Parameterized.Parameters;
54 import org.mockito.Mockito;
55 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
56 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
57 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
58 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
59 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
60 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
61 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
62 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
63 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
64 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
65 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
66 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
67 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
68 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
69 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
70 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
71 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
72 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
73 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
74 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
75 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
76 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
77 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
78 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
79 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
80 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
81 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
82 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
83 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
84 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
85 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
86 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
87 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
88 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
89 import org.opendaylight.yangtools.concepts.ListenerRegistration;
90 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
91 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
92 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
93 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
94 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
95 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
96 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
97 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
98 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
99 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
100
101 @RunWith(Parameterized.class)
102 public class DistributedDataStoreIntegrationTest {
103
104     @Parameters(name = "{0}")
105     public static Collection<Object[]> data() {
106         return Arrays.asList(new Object[][] {
107                 { DistributedDataStore.class }, { ClientBackedDataStore.class }
108         });
109     }
110
111     @Parameter
112     public Class<? extends AbstractDataStore> testParameter;
113
114     private ActorSystem system;
115
116     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
117             .shardHeartbeatIntervalInMillis(100);
118
119     @Before
120     public void setUp() throws IOException {
121         InMemorySnapshotStore.clear();
122         InMemoryJournal.clear();
123         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
124         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
125         Cluster.get(system).join(member1Address);
126     }
127
128     @After
129     public void tearDown() throws IOException {
130         JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
131         system = null;
132     }
133
134     protected ActorSystem getSystem() {
135         return system;
136     }
137
138     @Test
139     public void testWriteTransactionWithSingleShard() throws Exception {
140         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
141             {
142                 try (AbstractDataStore dataStore = setupAbstractDataStore(
143                         testParameter, "transactionIntegrationTest", "test-1")) {
144
145                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
146                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
147
148                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
149                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
150                 }
151             }
152         };
153     }
154
155     @Test
156     public void testWriteTransactionWithMultipleShards() throws Exception {
157         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
158             {
159                 try (AbstractDataStore dataStore = setupAbstractDataStore(
160                         testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
161
162                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
163                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
164
165                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
166                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
167
168                     doCommit(writeTx.ready());
169
170                     writeTx = dataStore.newWriteOnlyTransaction();
171
172                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
173                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
174
175                     doCommit(writeTx.ready());
176
177                     writeTx = dataStore.newWriteOnlyTransaction();
178
179                     final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
180                     final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
181                     writeTx.write(carPath, car);
182
183                     final MapEntryNode person = PeopleModel.newPersonEntry("jack");
184                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
185                     writeTx.write(personPath, person);
186
187                     doCommit(writeTx.ready());
188
189                     // Verify the data in the store
190                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
191
192                     Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
193                     assertEquals("isPresent", true, optional.isPresent());
194                     assertEquals("Data node", car, optional.get());
195
196                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
197                     assertEquals("isPresent", true, optional.isPresent());
198                     assertEquals("Data node", person, optional.get());
199                 }
200             }
201         };
202     }
203
204     @Test
205     public void testReadWriteTransactionWithSingleShard() throws Exception {
206         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
207             {
208                 try (AbstractDataStore dataStore = setupAbstractDataStore(
209                         testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
210
211                     // 1. Create a read-write Tx
212                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
213                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
214
215                     // 2. Write some data
216                     final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
217                     final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
218                     readWriteTx.write(nodePath, nodeToWrite);
219
220                     // 3. Read the data from Tx
221                     final Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
222                     assertEquals("exists", true, exists);
223
224                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
225                     assertEquals("isPresent", true, optional.isPresent());
226                     assertEquals("Data node", nodeToWrite, optional.get());
227
228                     // 4. Ready the Tx for commit
229                     final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
230
231                     // 5. Commit the Tx
232                     doCommit(cohort);
233
234                     // 6. Verify the data in the store
235                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
236
237                     optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
238                     assertEquals("isPresent", true, optional.isPresent());
239                     assertEquals("Data node", nodeToWrite, optional.get());
240                 }
241             }
242         };
243     }
244
245     @Test
246     public void testReadWriteTransactionWithMultipleShards() throws Exception {
247         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
248             {
249                 try (AbstractDataStore dataStore = setupAbstractDataStore(
250                         testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
251
252                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
253                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
254
255                     readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
256                     readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
257
258                     doCommit(readWriteTx.ready());
259
260                     readWriteTx = dataStore.newReadWriteTransaction();
261
262                     readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
263                     readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
264
265                     doCommit(readWriteTx.ready());
266
267                     readWriteTx = dataStore.newReadWriteTransaction();
268
269                     final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
270                     final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
271                     readWriteTx.write(carPath, car);
272
273                     final MapEntryNode person = PeopleModel.newPersonEntry("jack");
274                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
275                     readWriteTx.write(personPath, person);
276
277                     final Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
278                     assertEquals("exists", true, exists);
279
280                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
281                     assertEquals("isPresent", true, optional.isPresent());
282                     assertEquals("Data node", car, optional.get());
283
284                     doCommit(readWriteTx.ready());
285
286                     // Verify the data in the store
287                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
288
289                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
290                     assertEquals("isPresent", true, optional.isPresent());
291                     assertEquals("Data node", car, optional.get());
292
293                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
294                     assertEquals("isPresent", true, optional.isPresent());
295                     assertEquals("Data node", person, optional.get());
296
297                 }
298             }
299         };
300     }
301
302     @Test
303     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
304         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
305             {
306                 try (AbstractDataStore dataStore = setupAbstractDataStore(
307                         testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
308
309                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
310
311                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
312                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
313                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
314                     doCommit(writeTx.ready());
315
316                     writeTx = txChain.newWriteOnlyTransaction();
317
318                     int numCars = 5;
319                     for (int i = 0; i < numCars; i++) {
320                         writeTx.write(CarsModel.newCarPath("car" + i),
321                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
322                     }
323
324                     doCommit(writeTx.ready());
325
326                     final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
327                             .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
328                     assertEquals("isPresent", true, optional.isPresent());
329                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
330                 }
331             }
332         };
333     }
334
335     @SuppressWarnings("checkstyle:IllegalCatch")
336     private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
337             throws Exception {
338         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
339             {
340                 final String shardName = "test-1";
341
342                 // Setup the InMemoryJournal to block shard recovery to ensure
343                 // the shard isn't
344                 // initialized until we create and submit the write the Tx.
345                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
346                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
347                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
348
349                 try (AbstractDataStore dataStore = setupAbstractDataStore(
350                         testParameter, testName, false, shardName)) {
351
352                     // Create the write Tx
353                     final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
354                             : dataStore.newReadWriteTransaction();
355                     assertNotNull("newReadWriteTransaction returned null", writeTx);
356
357                     // Do some modification operations and ready the Tx on a
358                     // separate thread.
359                     final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
360                             .builder(TestModel.OUTER_LIST_PATH)
361                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
362
363                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
364                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
365                     final CountDownLatch txReady = new CountDownLatch(1);
366                     final Thread txThread = new Thread() {
367                         @Override
368                         public void run() {
369                             try {
370                                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
371
372                                 writeTx.merge(TestModel.OUTER_LIST_PATH,
373                                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
374
375                                 writeTx.write(listEntryPath,
376                                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
377
378                                 writeTx.delete(listEntryPath);
379
380                                 txCohort.set(writeTx.ready());
381                             } catch (Exception e) {
382                                 caughtEx.set(e);
383                             } finally {
384                                 txReady.countDown();
385                             }
386                         }
387                     };
388
389                     txThread.start();
390
391                     // Wait for the Tx operations to complete.
392                     final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
393                     if (caughtEx.get() != null) {
394                         throw caughtEx.get();
395                     }
396
397                     assertEquals("Tx ready", true, done);
398
399                     // At this point the Tx operations should be waiting for the
400                     // shard to initialize so
401                     // trigger the latch to let the shard recovery to continue.
402                     blockRecoveryLatch.countDown();
403
404                     // Wait for the Tx commit to complete.
405                     doCommit(txCohort.get());
406
407                     // Verify the data in the store
408                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
409
410                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
411                     assertEquals("isPresent", true, optional.isPresent());
412
413                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
414                     assertEquals("isPresent", true, optional.isPresent());
415
416                     optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
417                     assertEquals("isPresent", false, optional.isPresent());
418                 }
419             }
420         };
421     }
422
423     @Test
424     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
425         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
426         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
427     }
428
429     @Test
430     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
431         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
432     }
433
434     @Test
435     @SuppressWarnings("checkstyle:IllegalCatch")
436     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
437         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
438             {
439                 final String testName = "testTransactionReadsWithShardNotInitiallyReady";
440                 final String shardName = "test-1";
441
442                 // Setup the InMemoryJournal to block shard recovery to ensure
443                 // the shard isn't
444                 // initialized until we create the Tx.
445                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
446                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
447                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
448
449                 try (AbstractDataStore dataStore = setupAbstractDataStore(
450                         testParameter, testName, false, shardName)) {
451
452                     // Create the read-write Tx
453                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
454                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
455
456                     // Do some reads on the Tx on a separate thread.
457                     final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
458                             new AtomicReference<>();
459                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
460                             txReadFuture = new AtomicReference<>();
461                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
462                     final CountDownLatch txReadsDone = new CountDownLatch(1);
463                     final Thread txThread = new Thread() {
464                         @Override
465                         public void run() {
466                             try {
467                                 readWriteTx.write(TestModel.TEST_PATH,
468                                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
469
470                                 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
471
472                                 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
473                             } catch (Exception e) {
474                                 caughtEx.set(e);
475                             } finally {
476                                 txReadsDone.countDown();
477                             }
478                         }
479                     };
480
481                     txThread.start();
482
483                     // Wait for the Tx operations to complete.
484                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
485                     if (caughtEx.get() != null) {
486                         throw caughtEx.get();
487                     }
488
489                     assertEquals("Tx reads done", true, done);
490
491                     // At this point the Tx operations should be waiting for the
492                     // shard to initialize so
493                     // trigger the latch to let the shard recovery to continue.
494                     blockRecoveryLatch.countDown();
495
496                     // Wait for the reads to complete and verify.
497                     assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
498                     assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
499
500                     readWriteTx.close();
501                 }
502             }
503         };
504     }
505
506     @Test(expected = NotInitializedException.class)
507     @SuppressWarnings("checkstyle:IllegalCatch")
508     public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
509         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
510             {
511                 final String testName = "testTransactionCommitFailureWithShardNotInitialized";
512                 final String shardName = "test-1";
513
514                 // Set the shard initialization timeout low for the test.
515                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
516
517                 // Setup the InMemoryJournal to block shard recovery
518                 // indefinitely.
519                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
520                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
521                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
522
523                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
524
525                 final AbstractDataStore dataStore =
526                         setupAbstractDataStore(testParameter, testName, false, shardName);
527
528                 // Create the write Tx
529                 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
530                 assertNotNull("newReadWriteTransaction returned null", writeTx);
531
532                 // Do some modifications and ready the Tx on a separate
533                 // thread.
534                 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
535                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
536                 final CountDownLatch txReady = new CountDownLatch(1);
537                 final Thread txThread = new Thread(() -> {
538                     try {
539                         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
540
541                         txCohort.set(writeTx.ready());
542                     } catch (Exception e) {
543                         caughtEx.set(e);
544                     } finally {
545                         txReady.countDown();
546                     }
547                 });
548
549                 txThread.start();
550
551                 // Wait for the Tx operations to complete.
552                 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
553                 if (caughtEx.get() != null) {
554                     throw caughtEx.get();
555                 }
556
557                 assertEquals("Tx ready", true, done);
558
559                 // Wait for the commit to complete. Since the shard never
560                 // initialized, the Tx should
561                 // have timed out and throw an appropriate exception cause.
562                 try {
563                     txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
564                     fail("Expected NotInitializedException");
565                 } catch (final Exception e) {
566                     Throwables.propagate(Throwables.getRootCause(e));
567                 } finally {
568                     blockRecoveryLatch.countDown();
569                 }
570             }
571         };
572     }
573
574     @Test(expected = NotInitializedException.class)
575     @SuppressWarnings("checkstyle:IllegalCatch")
576     public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
577         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
578             {
579                 final String testName = "testTransactionReadFailureWithShardNotInitialized";
580                 final String shardName = "test-1";
581
582                 // Set the shard initialization timeout low for the test.
583                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
584
585                 // Setup the InMemoryJournal to block shard recovery
586                 // indefinitely.
587                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
588                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
589                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
590
591                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
592
593                 try (AbstractDataStore dataStore = setupAbstractDataStore(
594                         testParameter, testName, false, shardName)) {
595
596                     // Create the read-write Tx
597                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
598                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
599
600                     // Do a read on the Tx on a separate thread.
601                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
602                             txReadFuture = new AtomicReference<>();
603                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
604                     final CountDownLatch txReadDone = new CountDownLatch(1);
605                     final Thread txThread = new Thread(() -> {
606                         try {
607                             readWriteTx.write(TestModel.TEST_PATH,
608                                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
609
610                             txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
611
612                             readWriteTx.close();
613                         } catch (Exception e) {
614                             caughtEx.set(e);
615                         } finally {
616                             txReadDone.countDown();
617                         }
618                     });
619
620                     txThread.start();
621
622                     // Wait for the Tx operations to complete.
623                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
624                     if (caughtEx.get() != null) {
625                         throw caughtEx.get();
626                     }
627
628                     assertEquals("Tx read done", true, done);
629
630                     // Wait for the read to complete. Since the shard never
631                     // initialized, the Tx should
632                     // have timed out and throw an appropriate exception cause.
633                     try {
634                         txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
635                         fail("Expected NotInitializedException");
636                     } catch (final ReadFailedException e) {
637                         Throwables.propagate(Throwables.getRootCause(e));
638                     } finally {
639                         blockRecoveryLatch.countDown();
640                     }
641                 }
642             }
643         };
644     }
645
646     @SuppressWarnings("checkstyle:IllegalCatch")
647     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
648             throws Exception {
649         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
650             {
651                 final String shardName = "default";
652
653                 // We don't want the shard to become the leader so prevent shard
654                 // elections.
655                 datastoreContextBuilder.customRaftPolicyImplementation(
656                         "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
657
658                 // The ShardManager uses the election timeout for FindPrimary so
659                 // reset it low so it will timeout quickly.
660                 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
661                         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
662
663                 try (AbstractDataStore dataStore = setupAbstractDataStore(
664                         testParameter, testName, false, shardName)) {
665
666                     final Object result = dataStore.getActorContext().executeOperation(
667                             dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
668                     assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
669
670                     // Create the write Tx.
671                     try (DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
672                             : dataStore.newReadWriteTransaction()) {
673                         assertNotNull("newReadWriteTransaction returned null", writeTx);
674
675                         // Do some modifications and ready the Tx on a separate
676                         // thread.
677                         final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
678                         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
679                         final CountDownLatch txReady = new CountDownLatch(1);
680                         final Thread txThread = new Thread(() -> {
681                             try {
682                                 writeTx.write(TestModel.JUNK_PATH,
683                                         ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
684
685                                 txCohort.set(writeTx.ready());
686                             } catch (Exception e) {
687                                 caughtEx.set(e);
688                             } finally {
689                                 txReady.countDown();
690                             }
691                         });
692
693                         txThread.start();
694
695                         // Wait for the Tx operations to complete.
696                         boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
697                         if (caughtEx.get() != null) {
698                             throw caughtEx.get();
699                         }
700
701                         assertEquals("Tx ready", true, done);
702
703                         // Wait for the commit to complete. Since no shard
704                         // leader was elected in time, the Tx
705                         // should have timed out and throw an appropriate
706                         // exception cause.
707                         try {
708                             txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
709                             fail("Expected NoShardLeaderException");
710                         } catch (final ExecutionException e) {
711                             Throwables.propagate(Throwables.getRootCause(e));
712                         }
713                     }
714                 }
715             }
716         };
717     }
718
719     @Test(expected = NoShardLeaderException.class)
720     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
721         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
722         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
723     }
724
725     @Test(expected = NoShardLeaderException.class)
726     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
727         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
728     }
729
730     @Test
731     public void testTransactionAbort() throws Exception {
732         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
733             {
734                 try (AbstractDataStore dataStore = setupAbstractDataStore(
735                         testParameter, "transactionAbortIntegrationTest", "test-1")) {
736
737                     final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
738                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
739
740                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
741
742                     final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
743
744                     cohort.canCommit().get(5, TimeUnit.SECONDS);
745
746                     cohort.abort().get(5, TimeUnit.SECONDS);
747
748                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
749                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
750                 }
751             }
752         };
753     }
754
755     @Test
756     @SuppressWarnings("checkstyle:IllegalCatch")
757     public void testTransactionChainWithSingleShard() throws Exception {
758         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
759             {
760                 try (AbstractDataStore dataStore = setupAbstractDataStore(
761                         testParameter, "testTransactionChainWithSingleShard", "test-1")) {
762
763                     // 1. Create a Tx chain and write-only Tx
764                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
765
766                     final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
767                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
768
769                     // 2. Write some data
770                     final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
771                     writeTx.write(TestModel.TEST_PATH, testNode);
772
773                     // 3. Ready the Tx for commit
774                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
775
776                     // 4. Commit the Tx on another thread that first waits for
777                     // the second read Tx.
778                     final CountDownLatch continueCommit1 = new CountDownLatch(1);
779                     final CountDownLatch commit1Done = new CountDownLatch(1);
780                     final AtomicReference<Exception> commit1Error = new AtomicReference<>();
781                     new Thread(() -> {
782                         try {
783                             continueCommit1.await();
784                             doCommit(cohort1);
785                         } catch (Exception e) {
786                             commit1Error.set(e);
787                         } finally {
788                             commit1Done.countDown();
789                         }
790                     }).start();
791
792                     // 5. Create a new read Tx from the chain to read and verify
793                     // the data from the first
794                     // Tx is visible after being readied.
795                     DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
796                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
797                     assertEquals("isPresent", true, optional.isPresent());
798                     assertEquals("Data node", testNode, optional.get());
799
800                     // 6. Create a new RW Tx from the chain, write more data,
801                     // and ready it
802                     final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
803                     final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
804                     rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
805
806                     final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
807
808                     // 7. Create a new read Tx from the chain to read the data
809                     // from the last RW Tx to
810                     // verify it is visible.
811                     readTx = txChain.newReadWriteTransaction();
812                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
813                     assertEquals("isPresent", true, optional.isPresent());
814                     assertEquals("Data node", outerNode, optional.get());
815
816                     // 8. Wait for the 2 commits to complete and close the
817                     // chain.
818                     continueCommit1.countDown();
819                     Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
820
821                     if (commit1Error.get() != null) {
822                         throw commit1Error.get();
823                     }
824
825                     doCommit(cohort2);
826
827                     txChain.close();
828
829                     // 9. Create a new read Tx from the data store and verify
830                     // committed data.
831                     readTx = dataStore.newReadOnlyTransaction();
832                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
833                     assertEquals("isPresent", true, optional.isPresent());
834                     assertEquals("Data node", outerNode, optional.get());
835                 }
836             }
837         };
838     }
839
840     @Test
841     public void testTransactionChainWithMultipleShards() throws Exception {
842         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
843             {
844                 try (AbstractDataStore dataStore = setupAbstractDataStore(
845                         testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
846
847                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
848
849                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
850                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
851
852                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
853                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
854
855                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
856                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
857
858                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
859
860                     final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
861
862                     final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
863                     final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
864                     readWriteTx.write(carPath, car);
865
866                     final MapEntryNode person = PeopleModel.newPersonEntry("jack");
867                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
868                     readWriteTx.merge(personPath, person);
869
870                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
871                     assertEquals("isPresent", true, optional.isPresent());
872                     assertEquals("Data node", car, optional.get());
873
874                     optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
875                     assertEquals("isPresent", true, optional.isPresent());
876                     assertEquals("Data node", person, optional.get());
877
878                     final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
879
880                     writeTx = txChain.newWriteOnlyTransaction();
881
882                     writeTx.delete(carPath);
883
884                     final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
885
886                     final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
887                     final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
888
889                     doCommit(canCommit1, cohort1);
890                     doCommit(canCommit2, cohort2);
891                     doCommit(cohort3);
892
893                     txChain.close();
894
895                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
896
897                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
898                     assertEquals("isPresent", false, optional.isPresent());
899
900                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
901                     assertEquals("isPresent", true, optional.isPresent());
902                     assertEquals("Data node", person, optional.get());
903                 }
904             }
905         };
906     }
907
908     @Test
909     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
910         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
911             {
912                 try (AbstractDataStore dataStore = setupAbstractDataStore(
913                         testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
914
915                     final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
916                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
917                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
918                             MoreExecutors.directExecutor());
919
920                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
921                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
922
923                     final List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
924
925                     final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
926                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
927                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
928                     futures.add(writeTx.submit());
929
930                     int numCars = 100;
931                     for (int i = 0; i < numCars; i++) {
932                         final DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
933
934                         rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
935                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
936
937                         futures.add(rwTx.submit());
938                     }
939
940                     for (final CheckedFuture<Void, TransactionCommitFailedException> f : futures) {
941                         f.checkedGet();
942                     }
943
944                     final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
945                             .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
946                     assertEquals("isPresent", true, optional.isPresent());
947                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
948
949                     txChain.close();
950
951                     broker.close();
952                 }
953             }
954         };
955     }
956
957     @Test
958     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
959         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
960             {
961                 try (AbstractDataStore dataStore = setupAbstractDataStore(
962                         testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
963
964                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
965
966                     final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
967
968                     rwTx1.ready();
969
970                     final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
971
972                     final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(
973                             5, TimeUnit.SECONDS);
974                     assertEquals("isPresent", false, optional.isPresent());
975
976                     txChain.close();
977                 }
978             }
979         };
980     }
981
982     @Test
983     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
984         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
985             {
986                 try (AbstractDataStore dataStore = setupAbstractDataStore(
987                         testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
988
989                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
990
991                     final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
992                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
993
994                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
995
996                     // Try to create another Tx of each type - each should fail
997                     // b/c the previous Tx wasn't
998                     // readied.
999                     assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
1000                 }
1001             }
1002         };
1003     }
1004
1005     @Test
1006     public void testCreateChainedTransactionAfterClose() throws Exception {
1007         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1008             {
1009                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1010                         testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
1011
1012                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1013                     txChain.close();
1014
1015                     // Try to create another Tx of each type - should fail b/c
1016                     // the previous Tx was closed.
1017                     assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
1018                 }
1019             }
1020         };
1021     }
1022
1023     @Test
1024     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
1025         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1026             {
1027                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1028                         testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
1029
1030                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1031
1032                     // Create a write tx and submit.
1033                     final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1034                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1035                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1036
1037                     // Create read-only tx's and issue a read.
1038                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 = txChain
1039                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1040
1041                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 = txChain
1042                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1043
1044                     // Create another write tx and issue the write.
1045                     DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1046                     writeTx2.write(TestModel.OUTER_LIST_PATH,
1047                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1048
1049                     // Ensure the reads succeed.
1050
1051                     assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1052                     assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1053
1054                     // Ensure the writes succeed.
1055                     DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1056
1057                     doCommit(cohort1);
1058                     doCommit(cohort2);
1059
1060                     assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
1061                             .checkedGet(5, TimeUnit.SECONDS).isPresent());
1062                 }
1063             }
1064         };
1065     }
1066
1067     @Test
1068     public void testChainedTransactionFailureWithSingleShard() throws Exception {
1069         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1070             {
1071                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1072                         testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1073
1074                     final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1075                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1076                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1077                             MoreExecutors.directExecutor());
1078
1079                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1080                     final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1081
1082                     final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
1083
1084                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1085                             PeopleModel.emptyContainer());
1086
1087                     final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1088                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1089                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1090
1091                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1092
1093                     try {
1094                         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1095                         fail("Expected TransactionCommitFailedException");
1096                     } catch (final TransactionCommitFailedException e) {
1097                         // Expected
1098                     }
1099
1100                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1101                             any(Throwable.class));
1102
1103                     txChain.close();
1104                     broker.close();
1105                 }
1106             }
1107         };
1108     }
1109
1110     @Test
1111     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
1112         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1113             {
1114                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1115                         testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1116
1117                     final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1118                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1119                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1120                             MoreExecutors.directExecutor());
1121
1122                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1123                     final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1124
1125                     final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
1126
1127                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1128                             PeopleModel.emptyContainer());
1129
1130                     final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1131                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1132                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1133
1134                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1135
1136                     // Note that merge will validate the data and fail but put
1137                     // succeeds b/c deep validation is not
1138                     // done for put for performance reasons.
1139                     try {
1140                         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1141                         fail("Expected TransactionCommitFailedException");
1142                     } catch (final TransactionCommitFailedException e) {
1143                         // Expected
1144                     }
1145
1146                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1147                             any(Throwable.class));
1148
1149                     txChain.close();
1150                     broker.close();
1151                 }
1152             }
1153         };
1154     }
1155
1156     @Test
1157     public void testChangeListenerRegistration() throws Exception {
1158         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1159             {
1160                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1161                         testParameter, "testChangeListenerRegistration", "test-1")) {
1162
1163                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
1164                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1165
1166                     final MockDataChangeListener listener = new MockDataChangeListener(1);
1167
1168                     final ListenerRegistration<MockDataChangeListener> listenerReg = dataStore
1169                             .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE);
1170
1171                     assertNotNull("registerChangeListener returned null", listenerReg);
1172
1173                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1174                         state -> assertEquals("getDataChangeListenerActors", 1,
1175                                 state.getDataChangeListenerActors().size()));
1176
1177                     // Wait for the initial notification
1178                     listener.waitForChangeEvents(TestModel.TEST_PATH);
1179                     listener.reset(2);
1180
1181                     // Write 2 updates.
1182                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1183                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1184
1185                     YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1186                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1187                     testWriteTransaction(dataStore, listPath,
1188                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1189
1190                     // Wait for the 2 updates.
1191                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1192                     listenerReg.close();
1193
1194                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1195                         state -> assertEquals("getDataChangeListenerActors", 0,
1196                                 state.getDataChangeListenerActors().size()));
1197
1198                     testWriteTransaction(dataStore,
1199                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1200                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1201                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1202
1203                     listener.expectNoMoreChanges("Received unexpected change after close");
1204                 }
1205             }
1206         };
1207     }
1208
1209     @Test
1210     public void testDataTreeChangeListenerRegistration() throws Exception {
1211         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1212             {
1213                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1214                         testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
1215
1216                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
1217                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1218
1219                     final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1220
1221                     ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
1222                             .registerTreeChangeListener(TestModel.TEST_PATH, listener);
1223
1224                     assertNotNull("registerTreeChangeListener returned null", listenerReg);
1225
1226                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1227                         state -> assertEquals("getTreeChangeListenerActors", 1,
1228                                 state.getTreeChangeListenerActors().size()));
1229
1230                     // Wait for the initial notification
1231                     listener.waitForChangeEvents(TestModel.TEST_PATH);
1232                     listener.reset(2);
1233
1234                     // Write 2 updates.
1235                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1236                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1237
1238                     YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1239                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1240                     testWriteTransaction(dataStore, listPath,
1241                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1242
1243                     // Wait for the 2 updates.
1244                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1245                     listenerReg.close();
1246
1247                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1248                         state -> assertEquals("getTreeChangeListenerActors", 0,
1249                                 state.getTreeChangeListenerActors().size()));
1250
1251                     testWriteTransaction(dataStore,
1252                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1253                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1254                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1255
1256                     listener.expectNoMoreChanges("Received unexpected change after close");
1257                 }
1258             }
1259         };
1260     }
1261
1262     @Test
1263     public void testRestoreFromDatastoreSnapshot() throws Exception {
1264         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1265             {
1266                 final String name = "transactionIntegrationTest";
1267
1268                 final ContainerNode carsNode = CarsModel.newCarsNode(
1269                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1270                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1271
1272                 DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1273                 dataTree.setSchemaContext(SchemaContextHelper.full());
1274                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1275                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1276
1277                 final Snapshot carsSnapshot = Snapshot.create(
1278                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1279                         Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1280
1281                 dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1282                 dataTree.setSchemaContext(SchemaContextHelper.full());
1283
1284                 final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1285                 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1286
1287                 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1288
1289                 final Snapshot peopleSnapshot = Snapshot.create(
1290                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1291                         Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1292
1293                 restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1294                         new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
1295                         new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
1296
1297                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1298                         testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
1299
1300                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1301
1302                     // two reads
1303                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1304                     assertEquals("isPresent", true, optional.isPresent());
1305                     assertEquals("Data node", carsNode, optional.get());
1306
1307                     optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1308                     assertEquals("isPresent", true, optional.isPresent());
1309                     assertEquals("Data node", peopleNode, optional.get());
1310                 }
1311             }
1312         };
1313     }
1314
1315     @Test
1316     @Deprecated
1317     public void testRecoveryFromPreCarbonSnapshot() throws Exception {
1318         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1319             {
1320                 final String name = "testRecoveryFromPreCarbonSnapshot";
1321
1322                 final ContainerNode carsNode = CarsModel.newCarsNode(
1323                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1324                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1325
1326                 DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1327                 dataTree.setSchemaContext(SchemaContextHelper.full());
1328                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1329                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1330
1331                 MetadataShardDataTreeSnapshot shardSnapshot = new MetadataShardDataTreeSnapshot(root);
1332                 final ByteArrayOutputStream bos = new ByteArrayOutputStream();
1333                 try (DataOutputStream dos = new DataOutputStream(bos)) {
1334                     PayloadVersion.BORON.writeTo(dos);
1335                     try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
1336                         oos.writeObject(shardSnapshot);
1337                     }
1338                 }
1339
1340                 final org.opendaylight.controller.cluster.raft.Snapshot snapshot =
1341                         org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(),
1342                                 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1343
1344                 InMemorySnapshotStore.addSnapshot("member-1-shard-cars-" + name, snapshot);
1345
1346                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1347                         testParameter, name, "module-shards-member1.conf", true, "cars")) {
1348
1349                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1350
1351                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1352                     assertEquals("isPresent", true, optional.isPresent());
1353                     assertEquals("Data node", carsNode, optional.get());
1354                 }
1355             }
1356         };
1357     }
1358 }