Fix DistributedDataStoreIntegrationTest failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.io.IOException;
32 import java.math.BigInteger;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.AfterClass;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.mockito.Mockito;
44 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
46 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
48 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
49 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
50 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
51 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
53 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
54 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
57 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
58 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
61 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
63 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
64 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
65 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
67 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
68 import org.opendaylight.yangtools.concepts.ListenerRegistration;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
70 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
71 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
74 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
75 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
76
77 public class DistributedDataStoreIntegrationTest {
78
79     private static ActorSystem system;
80
81     private final DatastoreContext.Builder datastoreContextBuilder =
82             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
83
84     @BeforeClass
85     public static void setUpClass() throws IOException {
86         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
87         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
88         Cluster.get(system).join(member1Address);
89     }
90
91     @AfterClass
92     public static void tearDownClass() throws IOException {
93         JavaTestKit.shutdownActorSystem(system);
94         system = null;
95     }
96
97     protected ActorSystem getSystem() {
98         return system;
99     }
100
101     @Test
102     public void testWriteTransactionWithSingleShard() throws Exception{
103         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
104             DistributedDataStore dataStore =
105                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
106
107             testWriteTransaction(dataStore, TestModel.TEST_PATH,
108                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
109
110             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
111                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
112
113             cleanup(dataStore);
114         }};
115     }
116
117     @Test
118     public void testWriteTransactionWithMultipleShards() throws Exception{
119         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
120             DistributedDataStore dataStore =
121                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
122
123             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
124             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
125
126             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
127             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
128
129             doCommit(writeTx.ready());
130
131             writeTx = dataStore.newWriteOnlyTransaction();
132
133             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
134             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
135
136             doCommit(writeTx.ready());
137
138             writeTx = dataStore.newWriteOnlyTransaction();
139
140             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
141             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
142             writeTx.write(carPath, car);
143
144             MapEntryNode person = PeopleModel.newPersonEntry("jack");
145             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
146             writeTx.write(personPath, person);
147
148             doCommit(writeTx.ready());
149
150             // Verify the data in the store
151
152             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
153
154             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
155             assertEquals("isPresent", true, optional.isPresent());
156             assertEquals("Data node", car, optional.get());
157
158             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
159             assertEquals("isPresent", true, optional.isPresent());
160             assertEquals("Data node", person, optional.get());
161
162             cleanup(dataStore);
163         }};
164     }
165
166     @Test
167     public void testReadWriteTransactionWithSingleShard() throws Exception{
168         System.setProperty("shard.persistent", "true");
169         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
170             DistributedDataStore dataStore =
171                     setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
172
173             // 1. Create a read-write Tx
174
175             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
176             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
177
178             // 2. Write some data
179
180             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
181             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
182             readWriteTx.write(nodePath, nodeToWrite );
183
184             // 3. Read the data from Tx
185
186             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
187             assertEquals("exists", true, exists);
188
189             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
190             assertEquals("isPresent", true, optional.isPresent());
191             assertEquals("Data node", nodeToWrite, optional.get());
192
193             // 4. Ready the Tx for commit
194
195             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
196
197             // 5. Commit the Tx
198
199             doCommit(cohort);
200
201             // 6. Verify the data in the store
202
203             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
204
205             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
206             assertEquals("isPresent", true, optional.isPresent());
207             assertEquals("Data node", nodeToWrite, optional.get());
208
209             cleanup(dataStore);
210         }};
211     }
212
213     @Test
214     public void testReadWriteTransactionWithMultipleShards() throws Exception{
215         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
216             DistributedDataStore dataStore =
217                     setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
218
219             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
220             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
221
222             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
223             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
224
225             doCommit(readWriteTx.ready());
226
227             readWriteTx = dataStore.newReadWriteTransaction();
228
229             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
230             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
231
232             doCommit(readWriteTx.ready());
233
234             readWriteTx = dataStore.newReadWriteTransaction();
235
236             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
237             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
238             readWriteTx.write(carPath, car);
239
240             MapEntryNode person = PeopleModel.newPersonEntry("jack");
241             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
242             readWriteTx.write(personPath, person);
243
244             Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
245             assertEquals("exists", true, exists);
246
247             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
248             assertEquals("isPresent", true, optional.isPresent());
249             assertEquals("Data node", car, optional.get());
250
251             doCommit(readWriteTx.ready());
252
253             // Verify the data in the store
254
255             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
256
257             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
258             assertEquals("isPresent", true, optional.isPresent());
259             assertEquals("Data node", car, optional.get());
260
261             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
262             assertEquals("isPresent", true, optional.isPresent());
263             assertEquals("Data node", person, optional.get());
264
265             cleanup(dataStore);
266         }};
267     }
268
269     @Test
270     public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
271         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
272             DistributedDataStore dataStore = setupDistributedDataStore(
273                     "testSingleTransactionsWritesInQuickSuccession", "cars-1");
274
275             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
276
277             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
278             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
279             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
280             doCommit(writeTx.ready());
281
282             writeTx = txChain.newWriteOnlyTransaction();
283
284             int nCars = 5;
285             for(int i = 0; i < nCars; i++) {
286                 writeTx.write(CarsModel.newCarPath("car" + i),
287                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
288             }
289
290             doCommit(writeTx.ready());
291
292             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
293                     CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
294             assertEquals("isPresent", true, optional.isPresent());
295             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
296
297             cleanup(dataStore);
298         }};
299     }
300
301     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
302             final boolean writeOnly) throws Exception {
303         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
304             String shardName = "test-1";
305
306             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
307             // initialized until we create and submit the write the Tx.
308             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
309             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
310             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
311
312             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
313
314             // Create the write Tx
315
316             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
317                     dataStore.newReadWriteTransaction();
318             assertNotNull("newReadWriteTransaction returned null", writeTx);
319
320             // Do some modification operations and ready the Tx on a separate thread.
321
322             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
323                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
324                             TestModel.ID_QNAME, 1).build();
325
326             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
327             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
328             final CountDownLatch txReady = new CountDownLatch(1);
329             Thread txThread = new Thread() {
330                 @Override
331                 public void run() {
332                     try {
333                         writeTx.write(TestModel.TEST_PATH,
334                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
335
336                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
337                                 TestModel.OUTER_LIST_QNAME).build());
338
339                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
340                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
341
342                         writeTx.delete(listEntryPath);
343
344                         txCohort.set(writeTx.ready());
345                     } catch(Exception e) {
346                         caughtEx.set(e);
347                         return;
348                     } finally {
349                         txReady.countDown();
350                     }
351                 }
352             };
353
354             txThread.start();
355
356             // Wait for the Tx operations to complete.
357
358             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
359             if(caughtEx.get() != null) {
360                 throw caughtEx.get();
361             }
362
363             assertEquals("Tx ready", true, done);
364
365             // At this point the Tx operations should be waiting for the shard to initialize so
366             // trigger the latch to let the shard recovery to continue.
367
368             blockRecoveryLatch.countDown();
369
370             // Wait for the Tx commit to complete.
371
372             doCommit(txCohort.get());
373
374             // Verify the data in the store
375
376             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
377
378             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
379                     get(5, TimeUnit.SECONDS);
380             assertEquals("isPresent", true, optional.isPresent());
381
382             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
383             assertEquals("isPresent", true, optional.isPresent());
384
385             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
386             assertEquals("isPresent", false, optional.isPresent());
387
388             cleanup(dataStore);
389         }};
390     }
391
392     @Test
393     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
394         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
395         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
396     }
397
398     @Test
399     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
400         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
401     }
402
403     @Test
404     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
405         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
406             String testName = "testTransactionReadsWithShardNotInitiallyReady";
407             String shardName = "test-1";
408
409             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
410             // initialized until we create the Tx.
411             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
412             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
413             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
414
415             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
416
417             // Create the read-write Tx
418
419             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
420             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
421
422             // Do some reads on the Tx on a separate thread.
423
424             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
425                     new AtomicReference<>();
426             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
427                     txReadFuture = new AtomicReference<>();
428             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
429             final CountDownLatch txReadsDone = new CountDownLatch(1);
430             Thread txThread = new Thread() {
431                 @Override
432                 public void run() {
433                     try {
434                         readWriteTx.write(TestModel.TEST_PATH,
435                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
436
437                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
438
439                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
440                     } catch(Exception e) {
441                         caughtEx.set(e);
442                         return;
443                     } finally {
444                         txReadsDone.countDown();
445                     }
446                 }
447             };
448
449             txThread.start();
450
451             // Wait for the Tx operations to complete.
452
453             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
454             if(caughtEx.get() != null) {
455                 throw caughtEx.get();
456             }
457
458             assertEquals("Tx reads done", true, done);
459
460             // At this point the Tx operations should be waiting for the shard to initialize so
461             // trigger the latch to let the shard recovery to continue.
462
463             blockRecoveryLatch.countDown();
464
465             // Wait for the reads to complete and verify.
466
467             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
468             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
469
470             readWriteTx.close();
471
472             cleanup(dataStore);
473         }};
474     }
475
476     @Test(expected=NotInitializedException.class)
477     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
478         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
479             String testName = "testTransactionCommitFailureWithShardNotInitialized";
480             String shardName = "test-1";
481
482             // Set the shard initialization timeout low for the test.
483
484             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
485
486             // Setup the InMemoryJournal to block shard recovery indefinitely.
487
488             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
489             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
490             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
491
492             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
493
494             // Create the write Tx
495
496             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
497             assertNotNull("newReadWriteTransaction returned null", writeTx);
498
499             // Do some modifications and ready the Tx on a separate thread.
500
501             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
502             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
503             final CountDownLatch txReady = new CountDownLatch(1);
504             Thread txThread = new Thread() {
505                 @Override
506                 public void run() {
507                     try {
508                         writeTx.write(TestModel.TEST_PATH,
509                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
510
511                         txCohort.set(writeTx.ready());
512                     } catch(Exception e) {
513                         caughtEx.set(e);
514                         return;
515                     } finally {
516                         txReady.countDown();
517                     }
518                 }
519             };
520
521             txThread.start();
522
523             // Wait for the Tx operations to complete.
524
525             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
526             if(caughtEx.get() != null) {
527                 throw caughtEx.get();
528             }
529
530             assertEquals("Tx ready", true, done);
531
532             // Wait for the commit to complete. Since the shard never initialized, the Tx should
533             // have timed out and throw an appropriate exception cause.
534
535             try {
536                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
537             } catch(ExecutionException e) {
538                 throw e.getCause();
539             } finally {
540                 blockRecoveryLatch.countDown();
541                 cleanup(dataStore);
542             }
543         }};
544     }
545
546     @Test(expected=NotInitializedException.class)
547     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
548         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
549             String testName = "testTransactionReadFailureWithShardNotInitialized";
550             String shardName = "test-1";
551
552             // Set the shard initialization timeout low for the test.
553
554             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
555
556             // Setup the InMemoryJournal to block shard recovery indefinitely.
557
558             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
559             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
560             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
561
562             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
563
564             // Create the read-write Tx
565
566             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
567             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
568
569             // Do a read on the Tx on a separate thread.
570
571             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
572                     txReadFuture = new AtomicReference<>();
573             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
574             final CountDownLatch txReadDone = new CountDownLatch(1);
575             Thread txThread = new Thread() {
576                 @Override
577                 public void run() {
578                     try {
579                         readWriteTx.write(TestModel.TEST_PATH,
580                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
581
582                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
583
584                         readWriteTx.close();
585                     } catch(Exception e) {
586                         caughtEx.set(e);
587                         return;
588                     } finally {
589                         txReadDone.countDown();
590                     }
591                 }
592             };
593
594             txThread.start();
595
596             // Wait for the Tx operations to complete.
597
598             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
599             if(caughtEx.get() != null) {
600                 throw caughtEx.get();
601             }
602
603             assertEquals("Tx read done", true, done);
604
605             // Wait for the read to complete. Since the shard never initialized, the Tx should
606             // have timed out and throw an appropriate exception cause.
607
608             try {
609                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
610             } catch(ReadFailedException e) {
611                 throw e.getCause();
612             } finally {
613                 blockRecoveryLatch.countDown();
614                 cleanup(dataStore);
615             }
616         }};
617     }
618
619     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) throws Throwable {
620         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
621             String shardName = "default";
622
623             // We don't want the shard to become the leader so prevent shard elections.
624             datastoreContextBuilder.customRaftPolicyImplementation(
625                     "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
626
627             // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
628             datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1).
629                     shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
630
631             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
632
633             Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
634                     new FindLocalShard(shardName, true));
635             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
636
637             // Create the write Tx.
638
639             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
640                 dataStore.newReadWriteTransaction();
641             assertNotNull("newReadWriteTransaction returned null", writeTx);
642
643             // Do some modifications and ready the Tx on a separate thread.
644
645             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
646             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
647             final CountDownLatch txReady = new CountDownLatch(1);
648             Thread txThread = new Thread() {
649                 @Override
650                 public void run() {
651                     try {
652                         writeTx.write(TestModel.JUNK_PATH,
653                                 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
654
655                         txCohort.set(writeTx.ready());
656                     } catch(Exception e) {
657                         caughtEx.set(e);
658                         return;
659                     } finally {
660                         txReady.countDown();
661                     }
662                 }
663             };
664
665             txThread.start();
666
667             // Wait for the Tx operations to complete.
668
669             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
670             if(caughtEx.get() != null) {
671                 throw caughtEx.get();
672             }
673
674             assertEquals("Tx ready", true, done);
675
676             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
677             // should have timed out and throw an appropriate exception cause.
678
679             try {
680                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
681             } catch(ExecutionException e) {
682                 throw e.getCause();
683             } finally {
684                 cleanup(dataStore);
685             }
686         }};
687     }
688
689     @Test(expected=NoShardLeaderException.class)
690     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
691         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
692         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
693     }
694
695     @Test(expected=NoShardLeaderException.class)
696     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
697         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
698     }
699
700     @Test
701     public void testTransactionAbort() throws Exception{
702         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
703             DistributedDataStore dataStore =
704                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
705
706             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
707             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
708
709             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
710
711             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
712
713             cohort.canCommit().get(5, TimeUnit.SECONDS);
714
715             cohort.abort().get(5, TimeUnit.SECONDS);
716
717             testWriteTransaction(dataStore, TestModel.TEST_PATH,
718                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
719
720             cleanup(dataStore);
721         }};
722     }
723
724     @Test
725     public void testTransactionChainWithSingleShard() throws Exception{
726         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
727             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
728
729             // 1. Create a Tx chain and write-only Tx
730
731             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
732
733             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
734             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
735
736             // 2. Write some data
737
738             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
739             writeTx.write(TestModel.TEST_PATH, testNode);
740
741             // 3. Ready the Tx for commit
742
743             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
744
745             // 4. Commit the Tx on another thread that first waits for the second read Tx.
746
747             final CountDownLatch continueCommit1 = new CountDownLatch(1);
748             final CountDownLatch commit1Done = new CountDownLatch(1);
749             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
750             new Thread() {
751                 @Override
752                 public void run() {
753                     try {
754                         continueCommit1.await();
755                         doCommit(cohort1);
756                     } catch (Exception e) {
757                         commit1Error.set(e);
758                     } finally {
759                         commit1Done.countDown();
760                     }
761                 }
762             }.start();
763
764             // 5. Create a new read Tx from the chain to read and verify the data from the first
765             // Tx is visible after being readied.
766
767             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
768             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
769             assertEquals("isPresent", true, optional.isPresent());
770             assertEquals("Data node", testNode, optional.get());
771
772             // 6. Create a new RW Tx from the chain, write more data, and ready it
773
774             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
775             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
776             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
777
778             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
779
780             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
781             // verify it is visible.
782
783             readTx = txChain.newReadWriteTransaction();
784             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
785             assertEquals("isPresent", true, optional.isPresent());
786             assertEquals("Data node", outerNode, optional.get());
787
788             // 8. Wait for the 2 commits to complete and close the chain.
789
790             continueCommit1.countDown();
791             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
792
793             if(commit1Error.get() != null) {
794                 throw commit1Error.get();
795             }
796
797             doCommit(cohort2);
798
799             txChain.close();
800
801             // 9. Create a new read Tx from the data store and verify committed data.
802
803             readTx = dataStore.newReadOnlyTransaction();
804             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
805             assertEquals("isPresent", true, optional.isPresent());
806             assertEquals("Data node", outerNode, optional.get());
807
808             cleanup(dataStore);
809         }};
810     }
811
812     @Test
813     public void testTransactionChainWithMultipleShards() throws Exception{
814         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
815             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
816                     "cars-1", "people-1");
817
818             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
819
820             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
821             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
822
823             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
824             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
825
826             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
827             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
828
829             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
830
831             DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
832
833             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
834             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
835             readWriteTx.write(carPath, car);
836
837             MapEntryNode person = PeopleModel.newPersonEntry("jack");
838             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
839             readWriteTx.merge(personPath, person);
840
841             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
842             assertEquals("isPresent", true, optional.isPresent());
843             assertEquals("Data node", car, optional.get());
844
845             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
846             assertEquals("isPresent", true, optional.isPresent());
847             assertEquals("Data node", person, optional.get());
848
849             DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
850
851             writeTx = txChain.newWriteOnlyTransaction();
852
853             writeTx.delete(carPath);
854
855             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
856
857             ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
858             ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
859
860             doCommit(canCommit1, cohort1);
861             doCommit(canCommit2, cohort2);
862             doCommit(cohort3);
863
864             txChain.close();
865
866             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
867
868             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
869             assertEquals("isPresent", false, optional.isPresent());
870
871             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
872             assertEquals("isPresent", true, optional.isPresent());
873             assertEquals("Data node", person, optional.get());
874
875             cleanup(dataStore);
876         }};
877     }
878
879     @Test
880     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
881         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
882             DistributedDataStore dataStore = setupDistributedDataStore(
883                     "testCreateChainedTransactionsInQuickSuccession", "cars-1");
884
885             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
886                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
887                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
888
889             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
890             DOMTransactionChain txChain = broker.createTransactionChain(listener);
891
892             List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
893
894             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
895             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
896             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
897             futures.add(writeTx.submit());
898
899             int nCars = 100;
900             for(int i = 0; i < nCars; i++) {
901                 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
902
903                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
904                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
905
906                 futures.add(rwTx.submit());
907             }
908
909             for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
910                 f.checkedGet();
911             }
912
913             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
914                     LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
915             assertEquals("isPresent", true, optional.isPresent());
916             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
917
918             txChain.close();
919
920             broker.close();
921
922             cleanup(dataStore);
923         }};
924     }
925
926     @Test
927     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
928         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
929             DistributedDataStore dataStore = setupDistributedDataStore(
930                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
931
932             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
933
934             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
935
936             rwTx1.ready();
937
938             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
939
940             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
941             assertEquals("isPresent", false, optional.isPresent());
942
943             txChain.close();
944
945             cleanup(dataStore);
946         }};
947     }
948
949     @Test
950     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
951         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
952             DistributedDataStore dataStore = setupDistributedDataStore(
953                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
954
955             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
956
957             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
958             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
959
960             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
961
962             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
963             // readied.
964
965             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
966         }};
967     }
968
969     @Test
970     public void testCreateChainedTransactionAfterClose() throws Throwable {
971         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
972             DistributedDataStore dataStore = setupDistributedDataStore(
973                     "testCreateChainedTransactionAfterClose", "test-1");
974
975             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
976
977             txChain.close();
978
979             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
980
981             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
982         }};
983     }
984
985     @Test
986     public void testChainedTransactionFailureWithSingleShard() throws Exception{
987         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
988             DistributedDataStore dataStore = setupDistributedDataStore(
989                     "testChainedTransactionFailureWithSingleShard", "cars-1");
990
991             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
992                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
993                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
994
995             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
996             DOMTransactionChain txChain = broker.createTransactionChain(listener);
997
998             DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
999
1000             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1001                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1002                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1003
1004             rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1005
1006             try {
1007                 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1008                 fail("Expected TransactionCommitFailedException");
1009             } catch (TransactionCommitFailedException e) {
1010                 // Expected
1011             }
1012
1013             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1014
1015             txChain.close();
1016             broker.close();
1017             cleanup(dataStore);
1018         }};
1019     }
1020
1021     @Test
1022     public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1023         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1024             DistributedDataStore dataStore = setupDistributedDataStore(
1025                     "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1026
1027             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1028                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1029                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1030
1031             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1032             DOMTransactionChain txChain = broker.createTransactionChain(listener);
1033
1034             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1035
1036             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1037
1038             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1039                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1040                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1041
1042             // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1043             // done for put for performance reasons.
1044             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1045
1046             try {
1047                 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1048                 fail("Expected TransactionCommitFailedException");
1049             } catch (TransactionCommitFailedException e) {
1050                 // Expected
1051             }
1052
1053             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1054
1055             txChain.close();
1056             broker.close();
1057             cleanup(dataStore);
1058         }};
1059     }
1060
1061     @Test
1062     public void testChangeListenerRegistration() throws Exception{
1063         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1064             DistributedDataStore dataStore =
1065                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1066
1067             testWriteTransaction(dataStore, TestModel.TEST_PATH,
1068                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1069
1070             MockDataChangeListener listener = new MockDataChangeListener(1);
1071
1072             ListenerRegistration<MockDataChangeListener>
1073                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1074                             DataChangeScope.SUBTREE);
1075
1076             assertNotNull("registerChangeListener returned null", listenerReg);
1077
1078             // Wait for the initial notification
1079
1080             listener.waitForChangeEvents(TestModel.TEST_PATH);
1081
1082             listener.reset(2);
1083
1084             // Write 2 updates.
1085
1086             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1087                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1088
1089             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1090                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1091             testWriteTransaction(dataStore, listPath,
1092                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1093
1094             // Wait for the 2 updates.
1095
1096             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1097
1098             listenerReg.close();
1099
1100             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1101                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1102                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1103
1104             listener.expectNoMoreChanges("Received unexpected change after close");
1105
1106             cleanup(dataStore);
1107         }};
1108     }
1109 }