sal-distributed-datastore: use lambdas
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.CheckedFuture;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.typesafe.config.ConfigFactory;
33 import java.io.ByteArrayOutputStream;
34 import java.io.DataOutputStream;
35 import java.io.IOException;
36 import java.io.ObjectOutputStream;
37 import java.math.BigInteger;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collection;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.After;
48 import org.junit.Before;
49 import org.junit.Test;
50 import org.junit.runner.RunWith;
51 import org.junit.runners.Parameterized;
52 import org.junit.runners.Parameterized.Parameter;
53 import org.junit.runners.Parameterized.Parameters;
54 import org.mockito.Mockito;
55 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
56 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
57 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
58 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
59 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
60 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
61 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
62 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
63 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
64 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
65 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
66 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
67 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
68 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
69 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
70 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
71 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
72 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
73 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
74 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
75 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
76 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
77 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
78 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
79 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
80 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
81 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
82 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
83 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
84 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
85 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
86 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
87 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
88 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
89 import org.opendaylight.yangtools.concepts.ListenerRegistration;
90 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
91 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
92 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
93 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
94 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
95 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
96 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
97 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
98 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
99 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
100
101 @RunWith(Parameterized.class)
102 public class DistributedDataStoreIntegrationTest {
103
104     @Parameters(name = "{0}")
105     public static Collection<Object[]> data() {
106         return Arrays.asList(new Object[][] {
107                 { DistributedDataStore.class }, { ClientBackedDataStore.class }
108         });
109     }
110
111     @Parameter
112     public Class<? extends AbstractDataStore> testParameter;
113
114     private ActorSystem system;
115
116     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
117             .shardHeartbeatIntervalInMillis(100);
118
119     @Before
120     public void setUp() throws IOException {
121         InMemorySnapshotStore.clear();
122         InMemoryJournal.clear();
123         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
124         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
125         Cluster.get(system).join(member1Address);
126     }
127
128     @After
129     public void tearDown() throws IOException {
130         JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
131         system = null;
132     }
133
134     protected ActorSystem getSystem() {
135         return system;
136     }
137
138     @Test
139     public void testWriteTransactionWithSingleShard() throws Exception {
140         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
141             {
142                 try (AbstractDataStore dataStore = setupAbstractDataStore(
143                         testParameter, "transactionIntegrationTest", "test-1")) {
144
145                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
146                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
147
148                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
149                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
150                 }
151             }
152         };
153     }
154
155     @Test
156     public void testWriteTransactionWithMultipleShards() throws Exception {
157         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
158             {
159                 try (AbstractDataStore dataStore = setupAbstractDataStore(
160                         testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
161
162                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
163                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
164
165                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
166                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
167
168                     doCommit(writeTx.ready());
169
170                     writeTx = dataStore.newWriteOnlyTransaction();
171
172                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
173                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
174
175                     doCommit(writeTx.ready());
176
177                     writeTx = dataStore.newWriteOnlyTransaction();
178
179                     final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
180                     final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
181                     writeTx.write(carPath, car);
182
183                     final MapEntryNode person = PeopleModel.newPersonEntry("jack");
184                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
185                     writeTx.write(personPath, person);
186
187                     doCommit(writeTx.ready());
188
189                     // Verify the data in the store
190                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
191
192                     Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
193                     assertEquals("isPresent", true, optional.isPresent());
194                     assertEquals("Data node", car, optional.get());
195
196                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
197                     assertEquals("isPresent", true, optional.isPresent());
198                     assertEquals("Data node", person, optional.get());
199                 }
200             }
201         };
202     }
203
204     @Test
205     public void testReadWriteTransactionWithSingleShard() throws Exception {
206         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
207             {
208                 try (AbstractDataStore dataStore = setupAbstractDataStore(
209                         testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
210
211                     // 1. Create a read-write Tx
212                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
213                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
214
215                     // 2. Write some data
216                     final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
217                     final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
218                     readWriteTx.write(nodePath, nodeToWrite);
219
220                     // 3. Read the data from Tx
221                     final Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
222                     assertEquals("exists", true, exists);
223
224                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
225                     assertEquals("isPresent", true, optional.isPresent());
226                     assertEquals("Data node", nodeToWrite, optional.get());
227
228                     // 4. Ready the Tx for commit
229                     final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
230
231                     // 5. Commit the Tx
232                     doCommit(cohort);
233
234                     // 6. Verify the data in the store
235                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
236
237                     optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
238                     assertEquals("isPresent", true, optional.isPresent());
239                     assertEquals("Data node", nodeToWrite, optional.get());
240                 }
241             }
242         };
243     }
244
245     @Test
246     public void testReadWriteTransactionWithMultipleShards() throws Exception {
247         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
248             {
249                 try (AbstractDataStore dataStore = setupAbstractDataStore(
250                         testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
251
252                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
253                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
254
255                     readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
256                     readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
257
258                     doCommit(readWriteTx.ready());
259
260                     readWriteTx = dataStore.newReadWriteTransaction();
261
262                     readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
263                     readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
264
265                     doCommit(readWriteTx.ready());
266
267                     readWriteTx = dataStore.newReadWriteTransaction();
268
269                     final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
270                     final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
271                     readWriteTx.write(carPath, car);
272
273                     final MapEntryNode person = PeopleModel.newPersonEntry("jack");
274                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
275                     readWriteTx.write(personPath, person);
276
277                     final Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
278                     assertEquals("exists", true, exists);
279
280                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
281                     assertEquals("isPresent", true, optional.isPresent());
282                     assertEquals("Data node", car, optional.get());
283
284                     doCommit(readWriteTx.ready());
285
286                     // Verify the data in the store
287                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
288
289                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
290                     assertEquals("isPresent", true, optional.isPresent());
291                     assertEquals("Data node", car, optional.get());
292
293                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
294                     assertEquals("isPresent", true, optional.isPresent());
295                     assertEquals("Data node", person, optional.get());
296
297                 }
298             }
299         };
300     }
301
302     @Test
303     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
304         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
305             {
306                 try (AbstractDataStore dataStore = setupAbstractDataStore(
307                         testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
308
309                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
310
311                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
312                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
313                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
314                     doCommit(writeTx.ready());
315
316                     writeTx = txChain.newWriteOnlyTransaction();
317
318                     int numCars = 5;
319                     for (int i = 0; i < numCars; i++) {
320                         writeTx.write(CarsModel.newCarPath("car" + i),
321                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
322                     }
323
324                     doCommit(writeTx.ready());
325
326                     final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
327                             .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
328                     assertEquals("isPresent", true, optional.isPresent());
329                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
330                 }
331             }
332         };
333     }
334
335     @SuppressWarnings("checkstyle:IllegalCatch")
336     private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
337             throws Exception {
338         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
339             {
340                 final String shardName = "test-1";
341
342                 // Setup the InMemoryJournal to block shard recovery to ensure
343                 // the shard isn't
344                 // initialized until we create and submit the write the Tx.
345                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
346                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
347                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
348
349                 try (AbstractDataStore dataStore = setupAbstractDataStore(
350                         testParameter, testName, false, shardName)) {
351
352                     // Create the write Tx
353                     final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
354                             : dataStore.newReadWriteTransaction();
355                     assertNotNull("newReadWriteTransaction returned null", writeTx);
356
357                     // Do some modification operations and ready the Tx on a
358                     // separate thread.
359                     final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
360                             .builder(TestModel.OUTER_LIST_PATH)
361                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
362
363                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
364                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
365                     final CountDownLatch txReady = new CountDownLatch(1);
366                     final Thread txThread = new Thread(() -> {
367                         try {
368                             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
369
370                             writeTx.merge(TestModel.OUTER_LIST_PATH,
371                                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
372
373                             writeTx.write(listEntryPath,
374                                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
375
376                             writeTx.delete(listEntryPath);
377
378                             txCohort.set(writeTx.ready());
379                         } catch (Exception e) {
380                             caughtEx.set(e);
381                         } finally {
382                             txReady.countDown();
383                         }
384                     });
385
386                     txThread.start();
387
388                     // Wait for the Tx operations to complete.
389                     final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
390                     if (caughtEx.get() != null) {
391                         throw caughtEx.get();
392                     }
393
394                     assertEquals("Tx ready", true, done);
395
396                     // At this point the Tx operations should be waiting for the
397                     // shard to initialize so
398                     // trigger the latch to let the shard recovery to continue.
399                     blockRecoveryLatch.countDown();
400
401                     // Wait for the Tx commit to complete.
402                     doCommit(txCohort.get());
403
404                     // Verify the data in the store
405                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
406
407                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
408                     assertEquals("isPresent", true, optional.isPresent());
409
410                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
411                     assertEquals("isPresent", true, optional.isPresent());
412
413                     optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
414                     assertEquals("isPresent", false, optional.isPresent());
415                 }
416             }
417         };
418     }
419
420     @Test
421     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
422         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
423         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
424     }
425
426     @Test
427     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
428         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
429     }
430
431     @Test
432     @SuppressWarnings("checkstyle:IllegalCatch")
433     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
434         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
435             {
436                 final String testName = "testTransactionReadsWithShardNotInitiallyReady";
437                 final String shardName = "test-1";
438
439                 // Setup the InMemoryJournal to block shard recovery to ensure
440                 // the shard isn't
441                 // initialized until we create the Tx.
442                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
443                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
444                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
445
446                 try (AbstractDataStore dataStore = setupAbstractDataStore(
447                         testParameter, testName, false, shardName)) {
448
449                     // Create the read-write Tx
450                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
451                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
452
453                     // Do some reads on the Tx on a separate thread.
454                     final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
455                             new AtomicReference<>();
456                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
457                             txReadFuture = new AtomicReference<>();
458                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
459                     final CountDownLatch txReadsDone = new CountDownLatch(1);
460                     final Thread txThread = new Thread(() -> {
461                         try {
462                             readWriteTx.write(TestModel.TEST_PATH,
463                                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
464
465                             txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
466
467                             txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
468                         } catch (Exception e) {
469                             caughtEx.set(e);
470                         } finally {
471                             txReadsDone.countDown();
472                         }
473                     });
474
475                     txThread.start();
476
477                     // Wait for the Tx operations to complete.
478                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
479                     if (caughtEx.get() != null) {
480                         throw caughtEx.get();
481                     }
482
483                     assertEquals("Tx reads done", true, done);
484
485                     // At this point the Tx operations should be waiting for the
486                     // shard to initialize so
487                     // trigger the latch to let the shard recovery to continue.
488                     blockRecoveryLatch.countDown();
489
490                     // Wait for the reads to complete and verify.
491                     assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
492                     assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
493
494                     readWriteTx.close();
495                 }
496             }
497         };
498     }
499
500     @Test(expected = NotInitializedException.class)
501     @SuppressWarnings("checkstyle:IllegalCatch")
502     public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
503         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
504             {
505                 final String testName = "testTransactionCommitFailureWithShardNotInitialized";
506                 final String shardName = "test-1";
507
508                 // Set the shard initialization timeout low for the test.
509                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
510
511                 // Setup the InMemoryJournal to block shard recovery
512                 // indefinitely.
513                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
514                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
515                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
516
517                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
518
519                 final AbstractDataStore dataStore =
520                         setupAbstractDataStore(testParameter, testName, false, shardName);
521
522                 // Create the write Tx
523                 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
524                 assertNotNull("newReadWriteTransaction returned null", writeTx);
525
526                 // Do some modifications and ready the Tx on a separate
527                 // thread.
528                 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
529                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
530                 final CountDownLatch txReady = new CountDownLatch(1);
531                 final Thread txThread = new Thread(() -> {
532                     try {
533                         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
534
535                         txCohort.set(writeTx.ready());
536                     } catch (Exception e) {
537                         caughtEx.set(e);
538                     } finally {
539                         txReady.countDown();
540                     }
541                 });
542
543                 txThread.start();
544
545                 // Wait for the Tx operations to complete.
546                 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
547                 if (caughtEx.get() != null) {
548                     throw caughtEx.get();
549                 }
550
551                 assertEquals("Tx ready", true, done);
552
553                 // Wait for the commit to complete. Since the shard never
554                 // initialized, the Tx should
555                 // have timed out and throw an appropriate exception cause.
556                 try {
557                     txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
558                     fail("Expected NotInitializedException");
559                 } catch (final Exception e) {
560                     Throwables.propagate(Throwables.getRootCause(e));
561                 } finally {
562                     blockRecoveryLatch.countDown();
563                 }
564             }
565         };
566     }
567
568     @Test(expected = NotInitializedException.class)
569     @SuppressWarnings("checkstyle:IllegalCatch")
570     public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
571         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
572             {
573                 final String testName = "testTransactionReadFailureWithShardNotInitialized";
574                 final String shardName = "test-1";
575
576                 // Set the shard initialization timeout low for the test.
577                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
578
579                 // Setup the InMemoryJournal to block shard recovery
580                 // indefinitely.
581                 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
582                 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
583                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
584
585                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
586
587                 try (AbstractDataStore dataStore = setupAbstractDataStore(
588                         testParameter, testName, false, shardName)) {
589
590                     // Create the read-write Tx
591                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
592                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
593
594                     // Do a read on the Tx on a separate thread.
595                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
596                             txReadFuture = new AtomicReference<>();
597                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
598                     final CountDownLatch txReadDone = new CountDownLatch(1);
599                     final Thread txThread = new Thread(() -> {
600                         try {
601                             readWriteTx.write(TestModel.TEST_PATH,
602                                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
603
604                             txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
605
606                             readWriteTx.close();
607                         } catch (Exception e) {
608                             caughtEx.set(e);
609                         } finally {
610                             txReadDone.countDown();
611                         }
612                     });
613
614                     txThread.start();
615
616                     // Wait for the Tx operations to complete.
617                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
618                     if (caughtEx.get() != null) {
619                         throw caughtEx.get();
620                     }
621
622                     assertEquals("Tx read done", true, done);
623
624                     // Wait for the read to complete. Since the shard never
625                     // initialized, the Tx should
626                     // have timed out and throw an appropriate exception cause.
627                     try {
628                         txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
629                         fail("Expected NotInitializedException");
630                     } catch (final ReadFailedException e) {
631                         Throwables.propagate(Throwables.getRootCause(e));
632                     } finally {
633                         blockRecoveryLatch.countDown();
634                     }
635                 }
636             }
637         };
638     }
639
640     @SuppressWarnings("checkstyle:IllegalCatch")
641     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
642             throws Exception {
643         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
644             {
645                 final String shardName = "default";
646
647                 // We don't want the shard to become the leader so prevent shard
648                 // elections.
649                 datastoreContextBuilder.customRaftPolicyImplementation(
650                         "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
651
652                 // The ShardManager uses the election timeout for FindPrimary so
653                 // reset it low so it will timeout quickly.
654                 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
655                         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
656
657                 try (AbstractDataStore dataStore = setupAbstractDataStore(
658                         testParameter, testName, false, shardName)) {
659
660                     final Object result = dataStore.getActorContext().executeOperation(
661                             dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
662                     assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
663
664                     // Create the write Tx.
665                     try (DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
666                             : dataStore.newReadWriteTransaction()) {
667                         assertNotNull("newReadWriteTransaction returned null", writeTx);
668
669                         // Do some modifications and ready the Tx on a separate
670                         // thread.
671                         final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
672                         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
673                         final CountDownLatch txReady = new CountDownLatch(1);
674                         final Thread txThread = new Thread(() -> {
675                             try {
676                                 writeTx.write(TestModel.JUNK_PATH,
677                                         ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
678
679                                 txCohort.set(writeTx.ready());
680                             } catch (Exception e) {
681                                 caughtEx.set(e);
682                             } finally {
683                                 txReady.countDown();
684                             }
685                         });
686
687                         txThread.start();
688
689                         // Wait for the Tx operations to complete.
690                         boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
691                         if (caughtEx.get() != null) {
692                             throw caughtEx.get();
693                         }
694
695                         assertEquals("Tx ready", true, done);
696
697                         // Wait for the commit to complete. Since no shard
698                         // leader was elected in time, the Tx
699                         // should have timed out and throw an appropriate
700                         // exception cause.
701                         try {
702                             txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
703                             fail("Expected NoShardLeaderException");
704                         } catch (final ExecutionException e) {
705                             Throwables.propagate(Throwables.getRootCause(e));
706                         }
707                     }
708                 }
709             }
710         };
711     }
712
713     @Test(expected = NoShardLeaderException.class)
714     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
715         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
716         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
717     }
718
719     @Test(expected = NoShardLeaderException.class)
720     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
721         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
722     }
723
724     @Test
725     public void testTransactionAbort() throws Exception {
726         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
727             {
728                 try (AbstractDataStore dataStore = setupAbstractDataStore(
729                         testParameter, "transactionAbortIntegrationTest", "test-1")) {
730
731                     final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
732                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
733
734                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
735
736                     final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
737
738                     cohort.canCommit().get(5, TimeUnit.SECONDS);
739
740                     cohort.abort().get(5, TimeUnit.SECONDS);
741
742                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
743                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
744                 }
745             }
746         };
747     }
748
749     @Test
750     @SuppressWarnings("checkstyle:IllegalCatch")
751     public void testTransactionChainWithSingleShard() throws Exception {
752         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
753             {
754                 try (AbstractDataStore dataStore = setupAbstractDataStore(
755                         testParameter, "testTransactionChainWithSingleShard", "test-1")) {
756
757                     // 1. Create a Tx chain and write-only Tx
758                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
759
760                     final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
761                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
762
763                     // 2. Write some data
764                     final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
765                     writeTx.write(TestModel.TEST_PATH, testNode);
766
767                     // 3. Ready the Tx for commit
768                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
769
770                     // 4. Commit the Tx on another thread that first waits for
771                     // the second read Tx.
772                     final CountDownLatch continueCommit1 = new CountDownLatch(1);
773                     final CountDownLatch commit1Done = new CountDownLatch(1);
774                     final AtomicReference<Exception> commit1Error = new AtomicReference<>();
775                     new Thread(() -> {
776                         try {
777                             continueCommit1.await();
778                             doCommit(cohort1);
779                         } catch (Exception e) {
780                             commit1Error.set(e);
781                         } finally {
782                             commit1Done.countDown();
783                         }
784                     }).start();
785
786                     // 5. Create a new read Tx from the chain to read and verify
787                     // the data from the first
788                     // Tx is visible after being readied.
789                     DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
790                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
791                     assertEquals("isPresent", true, optional.isPresent());
792                     assertEquals("Data node", testNode, optional.get());
793
794                     // 6. Create a new RW Tx from the chain, write more data,
795                     // and ready it
796                     final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
797                     final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
798                     rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
799
800                     final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
801
802                     // 7. Create a new read Tx from the chain to read the data
803                     // from the last RW Tx to
804                     // verify it is visible.
805                     readTx = txChain.newReadWriteTransaction();
806                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
807                     assertEquals("isPresent", true, optional.isPresent());
808                     assertEquals("Data node", outerNode, optional.get());
809
810                     // 8. Wait for the 2 commits to complete and close the
811                     // chain.
812                     continueCommit1.countDown();
813                     Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
814
815                     if (commit1Error.get() != null) {
816                         throw commit1Error.get();
817                     }
818
819                     doCommit(cohort2);
820
821                     txChain.close();
822
823                     // 9. Create a new read Tx from the data store and verify
824                     // committed data.
825                     readTx = dataStore.newReadOnlyTransaction();
826                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
827                     assertEquals("isPresent", true, optional.isPresent());
828                     assertEquals("Data node", outerNode, optional.get());
829                 }
830             }
831         };
832     }
833
834     @Test
835     public void testTransactionChainWithMultipleShards() throws Exception {
836         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
837             {
838                 try (AbstractDataStore dataStore = setupAbstractDataStore(
839                         testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
840
841                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
842
843                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
844                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
845
846                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
847                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
848
849                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
850                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
851
852                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
853
854                     final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
855
856                     final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
857                     final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
858                     readWriteTx.write(carPath, car);
859
860                     final MapEntryNode person = PeopleModel.newPersonEntry("jack");
861                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
862                     readWriteTx.merge(personPath, person);
863
864                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
865                     assertEquals("isPresent", true, optional.isPresent());
866                     assertEquals("Data node", car, optional.get());
867
868                     optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
869                     assertEquals("isPresent", true, optional.isPresent());
870                     assertEquals("Data node", person, optional.get());
871
872                     final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
873
874                     writeTx = txChain.newWriteOnlyTransaction();
875
876                     writeTx.delete(carPath);
877
878                     final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
879
880                     final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
881                     final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
882
883                     doCommit(canCommit1, cohort1);
884                     doCommit(canCommit2, cohort2);
885                     doCommit(cohort3);
886
887                     txChain.close();
888
889                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
890
891                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
892                     assertEquals("isPresent", false, optional.isPresent());
893
894                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
895                     assertEquals("isPresent", true, optional.isPresent());
896                     assertEquals("Data node", person, optional.get());
897                 }
898             }
899         };
900     }
901
902     @Test
903     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
904         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
905             {
906                 try (AbstractDataStore dataStore = setupAbstractDataStore(
907                         testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
908
909                     final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
910                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
911                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
912                             MoreExecutors.directExecutor());
913
914                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
915                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
916
917                     final List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
918
919                     final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
920                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
921                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
922                     futures.add(writeTx.submit());
923
924                     int numCars = 100;
925                     for (int i = 0; i < numCars; i++) {
926                         final DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
927
928                         rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
929                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
930
931                         futures.add(rwTx.submit());
932                     }
933
934                     for (final CheckedFuture<Void, TransactionCommitFailedException> f : futures) {
935                         f.checkedGet();
936                     }
937
938                     final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
939                             .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
940                     assertEquals("isPresent", true, optional.isPresent());
941                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
942
943                     txChain.close();
944
945                     broker.close();
946                 }
947             }
948         };
949     }
950
951     @Test
952     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
953         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
954             {
955                 try (AbstractDataStore dataStore = setupAbstractDataStore(
956                         testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
957
958                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
959
960                     final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
961
962                     rwTx1.ready();
963
964                     final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
965
966                     final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(
967                             5, TimeUnit.SECONDS);
968                     assertEquals("isPresent", false, optional.isPresent());
969
970                     txChain.close();
971                 }
972             }
973         };
974     }
975
976     @Test
977     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
978         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
979             {
980                 try (AbstractDataStore dataStore = setupAbstractDataStore(
981                         testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
982
983                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
984
985                     final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
986                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
987
988                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
989
990                     // Try to create another Tx of each type - each should fail
991                     // b/c the previous Tx wasn't
992                     // readied.
993                     assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
994                 }
995             }
996         };
997     }
998
999     @Test
1000     public void testCreateChainedTransactionAfterClose() throws Exception {
1001         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1002             {
1003                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1004                         testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
1005
1006                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1007                     txChain.close();
1008
1009                     // Try to create another Tx of each type - should fail b/c
1010                     // the previous Tx was closed.
1011                     assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
1012                 }
1013             }
1014         };
1015     }
1016
1017     @Test
1018     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
1019         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1020             {
1021                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1022                         testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
1023
1024                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1025
1026                     // Create a write tx and submit.
1027                     final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1028                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1029                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1030
1031                     // Create read-only tx's and issue a read.
1032                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 = txChain
1033                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1034
1035                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 = txChain
1036                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1037
1038                     // Create another write tx and issue the write.
1039                     DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1040                     writeTx2.write(TestModel.OUTER_LIST_PATH,
1041                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1042
1043                     // Ensure the reads succeed.
1044
1045                     assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1046                     assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1047
1048                     // Ensure the writes succeed.
1049                     DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1050
1051                     doCommit(cohort1);
1052                     doCommit(cohort2);
1053
1054                     assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
1055                             .checkedGet(5, TimeUnit.SECONDS).isPresent());
1056                 }
1057             }
1058         };
1059     }
1060
1061     @Test
1062     public void testChainedTransactionFailureWithSingleShard() throws Exception {
1063         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1064             {
1065                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1066                         testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1067
1068                     final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1069                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1070                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1071                             MoreExecutors.directExecutor());
1072
1073                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1074                     final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1075
1076                     final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
1077
1078                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1079                             PeopleModel.emptyContainer());
1080
1081                     final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1082                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1083                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1084
1085                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1086
1087                     try {
1088                         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1089                         fail("Expected TransactionCommitFailedException");
1090                     } catch (final TransactionCommitFailedException e) {
1091                         // Expected
1092                     }
1093
1094                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1095                             any(Throwable.class));
1096
1097                     txChain.close();
1098                     broker.close();
1099                 }
1100             }
1101         };
1102     }
1103
1104     @Test
1105     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
1106         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1107             {
1108                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1109                         testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1110
1111                     final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1112                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1113                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1114                             MoreExecutors.directExecutor());
1115
1116                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1117                     final DOMTransactionChain txChain = broker.createTransactionChain(listener);
1118
1119                     final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
1120
1121                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1122                             PeopleModel.emptyContainer());
1123
1124                     final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1125                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1126                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1127
1128                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1129
1130                     // Note that merge will validate the data and fail but put
1131                     // succeeds b/c deep validation is not
1132                     // done for put for performance reasons.
1133                     try {
1134                         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1135                         fail("Expected TransactionCommitFailedException");
1136                     } catch (final TransactionCommitFailedException e) {
1137                         // Expected
1138                     }
1139
1140                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1141                             any(Throwable.class));
1142
1143                     txChain.close();
1144                     broker.close();
1145                 }
1146             }
1147         };
1148     }
1149
1150     @Test
1151     public void testChangeListenerRegistration() throws Exception {
1152         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1153             {
1154                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1155                         testParameter, "testChangeListenerRegistration", "test-1")) {
1156
1157                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
1158                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1159
1160                     final MockDataChangeListener listener = new MockDataChangeListener(1);
1161
1162                     final ListenerRegistration<MockDataChangeListener> listenerReg = dataStore
1163                             .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE);
1164
1165                     assertNotNull("registerChangeListener returned null", listenerReg);
1166
1167                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1168                         state -> assertEquals("getDataChangeListenerActors", 1,
1169                                 state.getDataChangeListenerActors().size()));
1170
1171                     // Wait for the initial notification
1172                     listener.waitForChangeEvents(TestModel.TEST_PATH);
1173                     listener.reset(2);
1174
1175                     // Write 2 updates.
1176                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1177                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1178
1179                     YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1180                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1181                     testWriteTransaction(dataStore, listPath,
1182                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1183
1184                     // Wait for the 2 updates.
1185                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1186                     listenerReg.close();
1187
1188                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1189                         state -> assertEquals("getDataChangeListenerActors", 0,
1190                                 state.getDataChangeListenerActors().size()));
1191
1192                     testWriteTransaction(dataStore,
1193                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1194                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1195                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1196
1197                     listener.expectNoMoreChanges("Received unexpected change after close");
1198                 }
1199             }
1200         };
1201     }
1202
1203     @Test
1204     public void testDataTreeChangeListenerRegistration() throws Exception {
1205         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1206             {
1207                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1208                         testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
1209
1210                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
1211                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1212
1213                     final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1214
1215                     ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
1216                             .registerTreeChangeListener(TestModel.TEST_PATH, listener);
1217
1218                     assertNotNull("registerTreeChangeListener returned null", listenerReg);
1219
1220                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1221                         state -> assertEquals("getTreeChangeListenerActors", 1,
1222                                 state.getTreeChangeListenerActors().size()));
1223
1224                     // Wait for the initial notification
1225                     listener.waitForChangeEvents(TestModel.TEST_PATH);
1226                     listener.reset(2);
1227
1228                     // Write 2 updates.
1229                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1230                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1231
1232                     YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1233                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1234                     testWriteTransaction(dataStore, listPath,
1235                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1236
1237                     // Wait for the 2 updates.
1238                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1239                     listenerReg.close();
1240
1241                     IntegrationTestKit.verifyShardState(dataStore, "test-1",
1242                         state -> assertEquals("getTreeChangeListenerActors", 0,
1243                                 state.getTreeChangeListenerActors().size()));
1244
1245                     testWriteTransaction(dataStore,
1246                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1247                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1248                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1249
1250                     listener.expectNoMoreChanges("Received unexpected change after close");
1251                 }
1252             }
1253         };
1254     }
1255
1256     @Test
1257     public void testRestoreFromDatastoreSnapshot() throws Exception {
1258         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1259             {
1260                 final String name = "transactionIntegrationTest";
1261
1262                 final ContainerNode carsNode = CarsModel.newCarsNode(
1263                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1264                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1265
1266                 DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1267                 dataTree.setSchemaContext(SchemaContextHelper.full());
1268                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1269                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1270
1271                 final Snapshot carsSnapshot = Snapshot.create(
1272                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1273                         Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1274
1275                 dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1276                 dataTree.setSchemaContext(SchemaContextHelper.full());
1277
1278                 final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1279                 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1280
1281                 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1282
1283                 final Snapshot peopleSnapshot = Snapshot.create(
1284                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1285                         Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1286
1287                 restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1288                         new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
1289                         new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
1290
1291                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1292                         testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
1293
1294                     final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1295
1296                     // two reads
1297                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1298                     assertEquals("isPresent", true, optional.isPresent());
1299                     assertEquals("Data node", carsNode, optional.get());
1300
1301                     optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1302                     assertEquals("isPresent", true, optional.isPresent());
1303                     assertEquals("Data node", peopleNode, optional.get());
1304                 }
1305             }
1306         };
1307     }
1308
1309     @Test
1310     @Deprecated
1311     public void testRecoveryFromPreCarbonSnapshot() throws Exception {
1312         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1313             {
1314                 final String name = "testRecoveryFromPreCarbonSnapshot";
1315
1316                 final ContainerNode carsNode = CarsModel.newCarsNode(
1317                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1318                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1319
1320                 DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1321                 dataTree.setSchemaContext(SchemaContextHelper.full());
1322                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1323                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1324
1325                 MetadataShardDataTreeSnapshot shardSnapshot = new MetadataShardDataTreeSnapshot(root);
1326                 final ByteArrayOutputStream bos = new ByteArrayOutputStream();
1327                 try (DataOutputStream dos = new DataOutputStream(bos)) {
1328                     PayloadVersion.BORON.writeTo(dos);
1329                     try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
1330                         oos.writeObject(shardSnapshot);
1331                     }
1332                 }
1333
1334                 final org.opendaylight.controller.cluster.raft.Snapshot snapshot =
1335                         org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(),
1336                                 Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1337
1338                 InMemorySnapshotStore.addSnapshot("member-1-shard-cars-" + name, snapshot);
1339
1340                 try (AbstractDataStore dataStore = setupAbstractDataStore(
1341                         testParameter, name, "module-shards-member1.conf", true, "cars")) {
1342
1343                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1344
1345                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1346                     assertEquals("isPresent", true, optional.isPresent());
1347                     assertEquals("Data node", carsNode, optional.get());
1348                 }
1349             }
1350         };
1351     }
1352 }