BUG-2138: Create DistributedShardFrontend
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.CheckedFuture;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.typesafe.config.ConfigFactory;
33 import java.io.ByteArrayOutputStream;
34 import java.io.DataOutputStream;
35 import java.io.IOException;
36 import java.io.ObjectOutputStream;
37 import java.math.BigInteger;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collection;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.AfterClass;
48 import org.junit.BeforeClass;
49 import org.junit.Test;
50 import org.mockito.Mockito;
51 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
54 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
56 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
57 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
58 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
59 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
60 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
61 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
62 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
63 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
64 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
65 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
66 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
67 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
68 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
69 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
70 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
71 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
72 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
73 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
74 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
75 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
76 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
77 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
78 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
79 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
80 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
81 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
82 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
83 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
84 import org.opendaylight.yangtools.concepts.ListenerRegistration;
85 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
86 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
87 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
88 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
89 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
90 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
91 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
92 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
93 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
94 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
95
96 public class DistributedDataStoreIntegrationTest {
97
98     private static ActorSystem system;
99
100     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
101             .shardHeartbeatIntervalInMillis(100);
102
103     @BeforeClass
104     public static void setUpClass() throws IOException {
105         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
106         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
107         Cluster.get(system).join(member1Address);
108     }
109
110     @AfterClass
111     public static void tearDownClass() throws IOException {
112         JavaTestKit.shutdownActorSystem(system);
113         system = null;
114     }
115
116     protected ActorSystem getSystem() {
117         return system;
118     }
119
120     @Test
121     public void testWriteTransactionWithSingleShard() throws Exception {
122         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
123             {
124                 try (AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
125                         "test-1")) {
126
127                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
128                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
129
130                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
131                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
132                 }
133             }
134         };
135     }
136
137     @Test
138     public void testWriteTransactionWithMultipleShards() throws Exception {
139         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
140             {
141                 try (AbstractDataStore dataStore = setupDistributedDataStore(
142                         "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
143
144                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
145                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
146
147                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
148                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
149
150                     doCommit(writeTx.ready());
151
152                     writeTx = dataStore.newWriteOnlyTransaction();
153
154                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
155                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
156
157                     doCommit(writeTx.ready());
158
159                     writeTx = dataStore.newWriteOnlyTransaction();
160
161                     MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
162                     YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
163                     writeTx.write(carPath, car);
164
165                     MapEntryNode person = PeopleModel.newPersonEntry("jack");
166                     YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
167                     writeTx.write(personPath, person);
168
169                     doCommit(writeTx.ready());
170
171                     // Verify the data in the store
172
173                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
174
175                     Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
176                     assertEquals("isPresent", true, optional.isPresent());
177                     assertEquals("Data node", car, optional.get());
178
179                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
180                     assertEquals("isPresent", true, optional.isPresent());
181                     assertEquals("Data node", person, optional.get());
182                 }
183             }
184         };
185     }
186
187     @Test
188     public void testReadWriteTransactionWithSingleShard() throws Exception {
189         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
190             {
191                 try (AbstractDataStore dataStore = setupDistributedDataStore(
192                         "testReadWriteTransactionWithSingleShard", "test-1")) {
193
194                     // 1. Create a read-write Tx
195
196                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
197                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
198
199                     // 2. Write some data
200
201                     YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
202                     NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
203                     readWriteTx.write(nodePath, nodeToWrite);
204
205                     // 3. Read the data from Tx
206
207                     Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
208                     assertEquals("exists", true, exists);
209
210                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
211                     assertEquals("isPresent", true, optional.isPresent());
212                     assertEquals("Data node", nodeToWrite, optional.get());
213
214                     // 4. Ready the Tx for commit
215
216                     DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
217
218                     // 5. Commit the Tx
219
220                     doCommit(cohort);
221
222                     // 6. Verify the data in the store
223
224                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
225
226                     optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
227                     assertEquals("isPresent", true, optional.isPresent());
228                     assertEquals("Data node", nodeToWrite, optional.get());
229                 }
230             }
231         };
232     }
233
234     @Test
235     public void testReadWriteTransactionWithMultipleShards() throws Exception {
236         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
237             {
238                 try (AbstractDataStore dataStore = setupDistributedDataStore(
239                         "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
240
241                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
242                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
243
244                     readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
245                     readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
246
247                     doCommit(readWriteTx.ready());
248
249                     readWriteTx = dataStore.newReadWriteTransaction();
250
251                     readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
252                     readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
253
254                     doCommit(readWriteTx.ready());
255
256                     readWriteTx = dataStore.newReadWriteTransaction();
257
258                     MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
259                     YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
260                     readWriteTx.write(carPath, car);
261
262                     MapEntryNode person = PeopleModel.newPersonEntry("jack");
263                     YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
264                     readWriteTx.write(personPath, person);
265
266                     Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
267                     assertEquals("exists", true, exists);
268
269                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
270                     assertEquals("isPresent", true, optional.isPresent());
271                     assertEquals("Data node", car, optional.get());
272
273                     doCommit(readWriteTx.ready());
274
275                     // Verify the data in the store
276
277                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
278
279                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
280                     assertEquals("isPresent", true, optional.isPresent());
281                     assertEquals("Data node", car, optional.get());
282
283                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
284                     assertEquals("isPresent", true, optional.isPresent());
285                     assertEquals("Data node", person, optional.get());
286
287                 }
288             }
289         };
290     }
291
292     @Test
293     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
294         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
295             {
296                 try (AbstractDataStore dataStore = setupDistributedDataStore(
297                         "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
298
299                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
300
301                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
302                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
303                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
304                     doCommit(writeTx.ready());
305
306                     writeTx = txChain.newWriteOnlyTransaction();
307
308                     int numCars = 5;
309                     for (int i = 0; i < numCars; i++) {
310                         writeTx.write(CarsModel.newCarPath("car" + i),
311                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
312                     }
313
314                     doCommit(writeTx.ready());
315
316                     Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
317                             .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
318                     assertEquals("isPresent", true, optional.isPresent());
319                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
320                 }
321             }
322         };
323     }
324
325     @SuppressWarnings("checkstyle:IllegalCatch")
326     private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
327             throws Exception {
328         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
329             {
330                 String shardName = "test-1";
331
332                 // Setup the InMemoryJournal to block shard recovery to ensure
333                 // the shard isn't
334                 // initialized until we create and submit the write the Tx.
335                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
336                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
337                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
338
339                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
340
341                     // Create the write Tx
342
343                     final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
344                             : dataStore.newReadWriteTransaction();
345                     assertNotNull("newReadWriteTransaction returned null", writeTx);
346
347                     // Do some modification operations and ready the Tx on a
348                     // separate thread.
349
350                     final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
351                             .builder(TestModel.OUTER_LIST_PATH)
352                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
353
354                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
355                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
356                     final CountDownLatch txReady = new CountDownLatch(1);
357                     Thread txThread = new Thread() {
358                         @Override
359                         public void run() {
360                             try {
361                                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
362
363                                 writeTx.merge(TestModel.OUTER_LIST_PATH,
364                                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
365
366                                 writeTx.write(listEntryPath,
367                                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
368
369                                 writeTx.delete(listEntryPath);
370
371                                 txCohort.set(writeTx.ready());
372                             } catch (Exception e) {
373                                 caughtEx.set(e);
374                                 return;
375                             } finally {
376                                 txReady.countDown();
377                             }
378                         }
379                     };
380
381                     txThread.start();
382
383                     // Wait for the Tx operations to complete.
384
385                     boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
386                     if (caughtEx.get() != null) {
387                         throw caughtEx.get();
388                     }
389
390                     assertEquals("Tx ready", true, done);
391
392                     // At this point the Tx operations should be waiting for the
393                     // shard to initialize so
394                     // trigger the latch to let the shard recovery to continue.
395
396                     blockRecoveryLatch.countDown();
397
398                     // Wait for the Tx commit to complete.
399
400                     doCommit(txCohort.get());
401
402                     // Verify the data in the store
403
404                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
405
406                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
407                     assertEquals("isPresent", true, optional.isPresent());
408
409                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
410                     assertEquals("isPresent", true, optional.isPresent());
411
412                     optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
413                     assertEquals("isPresent", false, optional.isPresent());
414                 }
415             }
416         };
417     }
418
419     @Test
420     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
421         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
422         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
423     }
424
425     @Test
426     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
427         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
428     }
429
430     @Test
431     @SuppressWarnings("checkstyle:IllegalCatch")
432     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
433         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
434             {
435                 String testName = "testTransactionReadsWithShardNotInitiallyReady";
436                 String shardName = "test-1";
437
438                 // Setup the InMemoryJournal to block shard recovery to ensure
439                 // the shard isn't
440                 // initialized until we create the Tx.
441                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
442                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
443                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
444
445                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
446
447                     // Create the read-write Tx
448
449                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
450                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
451
452                     // Do some reads on the Tx on a separate thread.
453
454                     final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
455                             new AtomicReference<>();
456                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
457                             txReadFuture = new AtomicReference<>();
458                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
459                     final CountDownLatch txReadsDone = new CountDownLatch(1);
460                     Thread txThread = new Thread() {
461                         @Override
462                         public void run() {
463                             try {
464                                 readWriteTx.write(TestModel.TEST_PATH,
465                                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
466
467                                 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
468
469                                 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
470                             } catch (Exception e) {
471                                 caughtEx.set(e);
472                                 return;
473                             } finally {
474                                 txReadsDone.countDown();
475                             }
476                         }
477                     };
478
479                     txThread.start();
480
481                     // Wait for the Tx operations to complete.
482
483                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
484                     if (caughtEx.get() != null) {
485                         throw caughtEx.get();
486                     }
487
488                     assertEquals("Tx reads done", true, done);
489
490                     // At this point the Tx operations should be waiting for the
491                     // shard to initialize so
492                     // trigger the latch to let the shard recovery to continue.
493
494                     blockRecoveryLatch.countDown();
495
496                     // Wait for the reads to complete and verify.
497
498                     assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
499                     assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
500
501                     readWriteTx.close();
502                 }
503             }
504         };
505     }
506
507     @Test(expected = NotInitializedException.class)
508     @SuppressWarnings("checkstyle:IllegalCatch")
509     public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
510         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
511             {
512                 String testName = "testTransactionCommitFailureWithShardNotInitialized";
513                 String shardName = "test-1";
514
515                 // Set the shard initialization timeout low for the test.
516
517                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
518
519                 // Setup the InMemoryJournal to block shard recovery
520                 // indefinitely.
521
522                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
523                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
524                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
525
526                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
527
528                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
529
530                     // Create the write Tx
531
532                     final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
533                     assertNotNull("newReadWriteTransaction returned null", writeTx);
534
535                     // Do some modifications and ready the Tx on a separate
536                     // thread.
537
538                     final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
539                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
540                     final CountDownLatch txReady = new CountDownLatch(1);
541                     Thread txThread = new Thread() {
542                         @Override
543                         public void run() {
544                             try {
545                                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
546
547                                 txCohort.set(writeTx.ready());
548                             } catch (Exception e) {
549                                 caughtEx.set(e);
550                                 return;
551                             } finally {
552                                 txReady.countDown();
553                             }
554                         }
555                     };
556
557                     txThread.start();
558
559                     // Wait for the Tx operations to complete.
560
561                     boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
562                     if (caughtEx.get() != null) {
563                         throw caughtEx.get();
564                     }
565
566                     assertEquals("Tx ready", true, done);
567
568                     // Wait for the commit to complete. Since the shard never
569                     // initialized, the Tx should
570                     // have timed out and throw an appropriate exception cause.
571
572                     try {
573                         txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
574                     } catch (ExecutionException e) {
575                         Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
576                         Throwables.propagate(e.getCause());
577                     } finally {
578                         blockRecoveryLatch.countDown();
579                     }
580                 }
581             }
582         };
583     }
584
585     @Test(expected = NotInitializedException.class)
586     @SuppressWarnings("checkstyle:IllegalCatch")
587     public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
588         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
589             {
590                 String testName = "testTransactionReadFailureWithShardNotInitialized";
591                 String shardName = "test-1";
592
593                 // Set the shard initialization timeout low for the test.
594
595                 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
596
597                 // Setup the InMemoryJournal to block shard recovery
598                 // indefinitely.
599
600                 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
601                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
602                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
603
604                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
605
606                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
607
608                     // Create the read-write Tx
609
610                     final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
611                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
612
613                     // Do a read on the Tx on a separate thread.
614
615                     final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
616                             txReadFuture = new AtomicReference<>();
617                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
618                     final CountDownLatch txReadDone = new CountDownLatch(1);
619                     Thread txThread = new Thread() {
620                         @Override
621                         public void run() {
622                             try {
623                                 readWriteTx.write(TestModel.TEST_PATH,
624                                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
625
626                                 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
627
628                                 readWriteTx.close();
629                             } catch (Exception e) {
630                                 caughtEx.set(e);
631                                 return;
632                             } finally {
633                                 txReadDone.countDown();
634                             }
635                         }
636                     };
637
638                     txThread.start();
639
640                     // Wait for the Tx operations to complete.
641
642                     boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
643                     if (caughtEx.get() != null) {
644                         throw caughtEx.get();
645                     }
646
647                     assertEquals("Tx read done", true, done);
648
649                     // Wait for the read to complete. Since the shard never
650                     // initialized, the Tx should
651                     // have timed out and throw an appropriate exception cause.
652
653                     try {
654                         txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
655                     } catch (ReadFailedException e) {
656                         Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
657                         Throwables.propagate(e.getCause());
658                     } finally {
659                         blockRecoveryLatch.countDown();
660                     }
661                 }
662             }
663         };
664     }
665
666     @SuppressWarnings("checkstyle:IllegalCatch")
667     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
668             throws Exception {
669         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
670             {
671                 String shardName = "default";
672
673                 // We don't want the shard to become the leader so prevent shard
674                 // elections.
675                 datastoreContextBuilder.customRaftPolicyImplementation(
676                         "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
677
678                 // The ShardManager uses the election timeout for FindPrimary so
679                 // reset it low so it will timeout quickly.
680                 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
681                         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
682
683                 try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
684
685                     Object result = dataStore.getActorContext().executeOperation(
686                             dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
687                     assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
688
689                     // Create the write Tx.
690
691                     try (DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
692                             : dataStore.newReadWriteTransaction()) {
693                         assertNotNull("newReadWriteTransaction returned null", writeTx);
694
695                         // Do some modifications and ready the Tx on a separate
696                         // thread.
697
698                         final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
699                         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
700                         final CountDownLatch txReady = new CountDownLatch(1);
701                         Thread txThread = new Thread() {
702                             @Override
703                             public void run() {
704                                 try {
705                                     writeTx.write(TestModel.JUNK_PATH,
706                                             ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
707
708                                     txCohort.set(writeTx.ready());
709                                 } catch (Exception e) {
710                                     caughtEx.set(e);
711                                     return;
712                                 } finally {
713                                     txReady.countDown();
714                                 }
715                             }
716                         };
717
718                         txThread.start();
719
720                         // Wait for the Tx operations to complete.
721
722                         boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
723                         if (caughtEx.get() != null) {
724                             throw caughtEx.get();
725                         }
726
727                         assertEquals("Tx ready", true, done);
728
729                         // Wait for the commit to complete. Since no shard
730                         // leader was elected in time, the Tx
731                         // should have timed out and throw an appropriate
732                         // exception cause.
733
734                         try {
735                             txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
736                         } catch (ExecutionException e) {
737                             Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
738                             Throwables.propagate(e.getCause());
739                         }
740                     }
741                 }
742             }
743         };
744     }
745
746     @Test(expected = NoShardLeaderException.class)
747     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
748         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
749         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
750     }
751
752     @Test(expected = NoShardLeaderException.class)
753     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
754         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
755     }
756
757     @Test
758     public void testTransactionAbort() throws Exception {
759         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
760             {
761                 try (AbstractDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest",
762                         "test-1")) {
763
764                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
765                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
766
767                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
768
769                     DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
770
771                     cohort.canCommit().get(5, TimeUnit.SECONDS);
772
773                     cohort.abort().get(5, TimeUnit.SECONDS);
774
775                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
776                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
777                 }
778             }
779         };
780     }
781
782     @Test
783     @SuppressWarnings("checkstyle:IllegalCatch")
784     public void testTransactionChainWithSingleShard() throws Exception {
785         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
786             {
787                 try (AbstractDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard",
788                         "test-1")) {
789
790                     // 1. Create a Tx chain and write-only Tx
791
792                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
793
794                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
795                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
796
797                     // 2. Write some data
798
799                     NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
800                     writeTx.write(TestModel.TEST_PATH, testNode);
801
802                     // 3. Ready the Tx for commit
803
804                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
805
806                     // 4. Commit the Tx on another thread that first waits for
807                     // the second read Tx.
808
809                     final CountDownLatch continueCommit1 = new CountDownLatch(1);
810                     final CountDownLatch commit1Done = new CountDownLatch(1);
811                     final AtomicReference<Exception> commit1Error = new AtomicReference<>();
812                     new Thread() {
813                         @Override
814                         public void run() {
815                             try {
816                                 continueCommit1.await();
817                                 doCommit(cohort1);
818                             } catch (Exception e) {
819                                 commit1Error.set(e);
820                             } finally {
821                                 commit1Done.countDown();
822                             }
823                         }
824                     }.start();
825
826                     // 5. Create a new read Tx from the chain to read and verify
827                     // the data from the first
828                     // Tx is visible after being readied.
829
830                     DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
831                     Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
832                     assertEquals("isPresent", true, optional.isPresent());
833                     assertEquals("Data node", testNode, optional.get());
834
835                     // 6. Create a new RW Tx from the chain, write more data,
836                     // and ready it
837
838                     DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
839                     MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
840                     rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
841
842                     final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
843
844                     // 7. Create a new read Tx from the chain to read the data
845                     // from the last RW Tx to
846                     // verify it is visible.
847
848                     readTx = txChain.newReadWriteTransaction();
849                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
850                     assertEquals("isPresent", true, optional.isPresent());
851                     assertEquals("Data node", outerNode, optional.get());
852
853                     // 8. Wait for the 2 commits to complete and close the
854                     // chain.
855
856                     continueCommit1.countDown();
857                     Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
858
859                     if (commit1Error.get() != null) {
860                         throw commit1Error.get();
861                     }
862
863                     doCommit(cohort2);
864
865                     txChain.close();
866
867                     // 9. Create a new read Tx from the data store and verify
868                     // committed data.
869
870                     readTx = dataStore.newReadOnlyTransaction();
871                     optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
872                     assertEquals("isPresent", true, optional.isPresent());
873                     assertEquals("Data node", outerNode, optional.get());
874                 }
875             }
876         };
877     }
878
879     @Test
880     public void testTransactionChainWithMultipleShards() throws Exception {
881         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
882             {
883                 try (AbstractDataStore dataStore = setupDistributedDataStore(
884                         "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
885
886                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
887
888                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
889                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
890
891                     writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
892                     writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
893
894                     writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
895                     writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
896
897                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
898
899                     DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
900
901                     MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
902                     YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
903                     readWriteTx.write(carPath, car);
904
905                     MapEntryNode person = PeopleModel.newPersonEntry("jack");
906                     YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
907                     readWriteTx.merge(personPath, person);
908
909                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
910                     assertEquals("isPresent", true, optional.isPresent());
911                     assertEquals("Data node", car, optional.get());
912
913                     optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
914                     assertEquals("isPresent", true, optional.isPresent());
915                     assertEquals("Data node", person, optional.get());
916
917                     DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
918
919                     writeTx = txChain.newWriteOnlyTransaction();
920
921                     writeTx.delete(carPath);
922
923                     DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
924
925                     ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
926                     ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
927
928                     doCommit(canCommit1, cohort1);
929                     doCommit(canCommit2, cohort2);
930                     doCommit(cohort3);
931
932                     txChain.close();
933
934                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
935
936                     optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
937                     assertEquals("isPresent", false, optional.isPresent());
938
939                     optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
940                     assertEquals("isPresent", true, optional.isPresent());
941                     assertEquals("Data node", person, optional.get());
942                 }
943             }
944         };
945     }
946
947     @Test
948     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
949         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
950             {
951                 try (AbstractDataStore dataStore = setupDistributedDataStore(
952                         "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
953
954                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
955                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
956                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
957                             MoreExecutors.directExecutor());
958
959                     TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
960                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
961
962                     List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
963
964                     DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
965                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
966                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
967                     futures.add(writeTx.submit());
968
969                     int numCars = 100;
970                     for (int i = 0; i < numCars; i++) {
971                         DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
972
973                         rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
974                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
975
976                         futures.add(rwTx.submit());
977                     }
978
979                     for (CheckedFuture<Void, TransactionCommitFailedException> f : futures) {
980                         f.checkedGet();
981                     }
982
983                     Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
984                             .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
985                     assertEquals("isPresent", true, optional.isPresent());
986                     assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
987
988                     txChain.close();
989
990                     broker.close();
991                 }
992             }
993         };
994     }
995
996     @Test
997     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
998         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
999             {
1000                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1001                         "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
1002
1003                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1004
1005                     DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
1006
1007                     rwTx1.ready();
1008
1009                     DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
1010
1011                     Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1012                     assertEquals("isPresent", false, optional.isPresent());
1013
1014                     txChain.close();
1015                 }
1016             }
1017         };
1018     }
1019
1020     @Test
1021     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
1022         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1023             {
1024                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1025                         "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
1026
1027                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1028
1029                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1030                     assertNotNull("newWriteOnlyTransaction returned null", writeTx);
1031
1032                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1033
1034                     // Try to create another Tx of each type - each should fail
1035                     // b/c the previous Tx wasn't
1036                     // readied.
1037
1038                     assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
1039                 }
1040             }
1041         };
1042     }
1043
1044     @Test
1045     public void testCreateChainedTransactionAfterClose() throws Exception {
1046         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1047             {
1048                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1049                         "testCreateChainedTransactionAfterClose", "test-1")) {
1050
1051                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1052
1053                     txChain.close();
1054
1055                     // Try to create another Tx of each type - should fail b/c
1056                     // the previous Tx was closed.
1057
1058                     assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
1059                 }
1060             }
1061         };
1062     }
1063
1064     @Test
1065     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
1066         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1067             {
1068                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1069                         "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
1070
1071                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1072
1073                     // Create a write tx and submit.
1074
1075                     DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1076                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1077                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1078
1079                     // Create read-only tx's and issue a read.
1080
1081                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 = txChain
1082                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1083
1084                     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 = txChain
1085                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
1086
1087                     // Create another write tx and issue the write.
1088
1089                     DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1090                     writeTx2.write(TestModel.OUTER_LIST_PATH,
1091                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1092
1093                     // Ensure the reads succeed.
1094
1095                     assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1096                     assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1097
1098                     // Ensure the writes succeed.
1099
1100                     DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1101
1102                     doCommit(cohort1);
1103                     doCommit(cohort2);
1104
1105                     assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
1106                             .checkedGet(5, TimeUnit.SECONDS).isPresent());
1107                 }
1108             }
1109         };
1110     }
1111
1112     @Test
1113     public void testChainedTransactionFailureWithSingleShard() throws Exception {
1114         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1115             {
1116                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1117                         "testChainedTransactionFailureWithSingleShard", "cars-1")) {
1118
1119                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1120                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1121                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1122                             MoreExecutors.directExecutor());
1123
1124                     TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1125                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
1126
1127                     DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1128
1129                     ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1130                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1131                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1132
1133                     rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1134
1135                     try {
1136                         rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1137                         fail("Expected TransactionCommitFailedException");
1138                     } catch (TransactionCommitFailedException e) {
1139                         // Expected
1140                     }
1141
1142                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx),
1143                             any(Throwable.class));
1144
1145                     txChain.close();
1146                     broker.close();
1147                 }
1148             }
1149         };
1150     }
1151
1152     @Test
1153     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
1154         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1155             {
1156                 try (AbstractDataStore dataStore = setupDistributedDataStore(
1157                         "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
1158
1159                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1160                             ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
1161                                     .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
1162                             MoreExecutors.directExecutor());
1163
1164                     TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1165                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
1166
1167                     DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1168
1169                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
1170                             PeopleModel.emptyContainer());
1171
1172                     ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
1173                             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
1174                             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1175
1176                     // Note that merge will validate the data and fail but put
1177                     // succeeds b/c deep validation is not
1178                     // done for put for performance reasons.
1179                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1180
1181                     try {
1182                         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1183                         fail("Expected TransactionCommitFailedException");
1184                     } catch (TransactionCommitFailedException e) {
1185                         // Expected
1186                     }
1187
1188                     verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
1189                             any(Throwable.class));
1190
1191                     txChain.close();
1192                     broker.close();
1193                 }
1194             }
1195         };
1196     }
1197
1198     @Test
1199     public void testChangeListenerRegistration() throws Exception {
1200         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1201             {
1202                 try (AbstractDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration",
1203                         "test-1")) {
1204
1205                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
1206                             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1207
1208                     MockDataChangeListener listener = new MockDataChangeListener(1);
1209
1210                     ListenerRegistration<MockDataChangeListener> listenerReg = dataStore
1211                             .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE);
1212
1213                     assertNotNull("registerChangeListener returned null", listenerReg);
1214
1215                     // Wait for the initial notification
1216
1217                     listener.waitForChangeEvents(TestModel.TEST_PATH);
1218
1219                     listener.reset(2);
1220
1221                     // Write 2 updates.
1222
1223                     testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1224                             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1225
1226                     YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1227                             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1228                     testWriteTransaction(dataStore, listPath,
1229                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1230
1231                     // Wait for the 2 updates.
1232
1233                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1234
1235                     listenerReg.close();
1236
1237                     testWriteTransaction(dataStore,
1238                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1239                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1240                             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1241
1242                     listener.expectNoMoreChanges("Received unexpected change after close");
1243                 }
1244             }
1245         };
1246     }
1247
1248     @Test
1249     public void testRestoreFromDatastoreSnapshot() throws Exception {
1250         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1251             {
1252                 final String name = "transactionIntegrationTest";
1253
1254                 ContainerNode carsNode = CarsModel.newCarsNode(
1255                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1256                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1257
1258                 DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1259                 dataTree.setSchemaContext(SchemaContextHelper.full());
1260                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1261                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1262
1263                 final Snapshot carsSnapshot = Snapshot.create(
1264                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1265                         Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1266
1267                 NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1268                 dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1269                 dataTree.setSchemaContext(SchemaContextHelper.full());
1270                 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1271                 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1272
1273                 Snapshot peopleSnapshot = Snapshot.create(
1274                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
1275                         Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1276
1277                 restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1278                         new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
1279                         new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
1280
1281                 try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1282                         true, "cars", "people")) {
1283
1284                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1285
1286                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1287                     assertEquals("isPresent", true, optional.isPresent());
1288                     assertEquals("Data node", carsNode, optional.get());
1289
1290                     optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1291                     assertEquals("isPresent", true, optional.isPresent());
1292                     assertEquals("Data node", peopleNode, optional.get());
1293                 }
1294             }
1295         };
1296     }
1297
1298     @Test
1299     @Deprecated
1300     public void testRecoveryFromPreCarbonSnapshot() throws Exception {
1301         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
1302             {
1303                 final String name = "testRecoveryFromPreCarbonSnapshot";
1304
1305                 ContainerNode carsNode = CarsModel.newCarsNode(
1306                         CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1307                                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1308
1309                 DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1310                 dataTree.setSchemaContext(SchemaContextHelper.full());
1311                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1312                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
1313
1314                 MetadataShardDataTreeSnapshot shardSnapshot = new MetadataShardDataTreeSnapshot(root);
1315                 final ByteArrayOutputStream bos = new ByteArrayOutputStream();
1316                 try (final DataOutputStream dos = new DataOutputStream(bos)) {
1317                     PayloadVersion.BORON.writeTo(dos);
1318                     try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
1319                         oos.writeObject(shardSnapshot);
1320                     }
1321                 }
1322
1323                 final org.opendaylight.controller.cluster.raft.Snapshot snapshot =
1324                     org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(),
1325                             Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
1326
1327                 InMemorySnapshotStore.addSnapshot("member-1-shard-cars-" + name, snapshot);
1328
1329                 try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1330                         true, "cars")) {
1331
1332                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1333
1334                     Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1335                     assertEquals("isPresent", true, optional.isPresent());
1336                     assertEquals("Data node", carsNode, optional.get());
1337                 }
1338             }
1339         };
1340     }
1341 }