Bump upstream SNAPSHOTS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractDistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.hamcrest.CoreMatchers.instanceOf;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertFalse;
15 import static org.junit.Assert.assertNotNull;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.ArgumentMatchers.any;
19 import static org.mockito.ArgumentMatchers.eq;
20 import static org.mockito.Mockito.timeout;
21 import static org.mockito.Mockito.verify;
22
23 import akka.actor.ActorSystem;
24 import com.google.common.base.Throwables;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.FluentFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Optional;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.Test;
41 import org.junit.runners.Parameterized.Parameter;
42 import org.mockito.Mockito;
43 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
44 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
45 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
46 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
47 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
49 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
50 import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
51 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
52 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
53 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
54 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
55 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
56 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
57 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
61 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
64 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
65 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
66 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
67 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
68 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
69 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
70 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
72 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
73 import org.opendaylight.yangtools.concepts.ListenerRegistration;
74 import org.opendaylight.yangtools.yang.common.Uint64;
75 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
76 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
77 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
80 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
81 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
82 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
83 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
84 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
85 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
86
87 public abstract class AbstractDistributedDataStoreIntegrationTest {
88
89     @Parameter
90     public Class<? extends AbstractDataStore> testParameter;
91
92     protected ActorSystem system;
93
94     protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
95             .shardHeartbeatIntervalInMillis(100);
96
97     protected ActorSystem getSystem() {
98         return system;
99     }
100
101     @Test
102     public void testWriteTransactionWithSingleShard() throws Exception {
103         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
104         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
105             testParameter, "transactionIntegrationTest", "test-1")) {
106
107             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
108                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
109
110             testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
111                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
112                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
113                 .build());
114         }
115     }
116
117     @Test
118     public void testWriteTransactionWithMultipleShards() throws Exception {
119         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
120         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
121             testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
122
123             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
124             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
125
126             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
127             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
128
129             testKit.doCommit(writeTx.ready());
130
131             writeTx = dataStore.newWriteOnlyTransaction();
132
133             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
134             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
135
136             testKit.doCommit(writeTx.ready());
137
138             writeTx = dataStore.newWriteOnlyTransaction();
139
140             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
141             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
142             writeTx.write(carPath, car);
143
144             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
145             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
146             writeTx.write(personPath, person);
147
148             testKit.doCommit(writeTx.ready());
149
150             // Verify the data in the store
151             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
152
153             Optional<NormalizedNode> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
154             assertTrue("isPresent", optional.isPresent());
155             assertEquals("Data node", car, optional.get());
156
157             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
158             assertTrue("isPresent", optional.isPresent());
159             assertEquals("Data node", person, optional.get());
160         }
161     }
162
163     @Test
164     public void testReadWriteTransactionWithSingleShard() throws Exception {
165         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
166         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
167             testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
168
169             // 1. Create a read-write Tx
170             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
171             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
172
173             // 2. Write some data
174             final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
175             final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
176             readWriteTx.write(nodePath, nodeToWrite);
177
178             // 3. Read the data from Tx
179             final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
180             assertEquals("exists", Boolean.TRUE, exists);
181
182             Optional<NormalizedNode> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
183             assertTrue("isPresent", optional.isPresent());
184             assertEquals("Data node", nodeToWrite, optional.get());
185
186             // 4. Ready the Tx for commit
187             final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
188
189             // 5. Commit the Tx
190             testKit.doCommit(cohort);
191
192             // 6. Verify the data in the store
193             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
194
195             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
196             assertTrue("isPresent", optional.isPresent());
197             assertEquals("Data node", nodeToWrite, optional.get());
198         }
199     }
200
201     @Test
202     public void testReadWriteTransactionWithMultipleShards() throws Exception {
203         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
204         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
205             testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
206
207             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
208             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
209
210             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
211             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
212
213             testKit.doCommit(readWriteTx.ready());
214
215             readWriteTx = dataStore.newReadWriteTransaction();
216
217             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
218             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
219
220             testKit.doCommit(readWriteTx.ready());
221
222             readWriteTx = dataStore.newReadWriteTransaction();
223
224             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
225             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
226             readWriteTx.write(carPath, car);
227
228             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
229             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
230             readWriteTx.write(personPath, person);
231
232             final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
233             assertEquals("exists", Boolean.TRUE, exists);
234
235             Optional<NormalizedNode> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
236             assertTrue("isPresent", optional.isPresent());
237             assertEquals("Data node", car, optional.get());
238
239             testKit.doCommit(readWriteTx.ready());
240
241             // Verify the data in the store
242             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
243
244             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
245             assertTrue("isPresent", optional.isPresent());
246             assertEquals("Data node", car, optional.get());
247
248             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
249             assertTrue("isPresent", optional.isPresent());
250             assertEquals("Data node", person, optional.get());
251         }
252     }
253
254     @Test
255     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
256         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
257         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
258             testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
259
260             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
261
262             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
263             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
264             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
265             testKit.doCommit(writeTx.ready());
266
267             int numCars = 5;
268             for (int i = 0; i < numCars; i++) {
269                 writeTx = txChain.newWriteOnlyTransaction();
270                 writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
271
272                 testKit.doCommit(writeTx.ready());
273
274                 try (var tx = txChain.newReadOnlyTransaction()) {
275                     tx.read(CarsModel.BASE_PATH).get();
276                 }
277             }
278
279             // wait to let the shard catch up with purged
280             await("transaction state propagation").atMost(5, TimeUnit.SECONDS)
281                 .pollInterval(500, TimeUnit.MILLISECONDS)
282                 .untilAsserted(() -> {
283                     // verify frontend metadata has no holes in purged transactions causing overtime memory leak
284                     final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
285                     FrontendShardDataTreeSnapshotMetadata frontendMetadata =
286                         (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
287                             .executeOperation(localShard, new RequestFrontendMetadata());
288
289                     final var clientMeta = frontendMetadata.getClients().get(0);
290                     if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
291                         assertTellMetadata(clientMeta);
292                     } else {
293                         assertAskMetadata(clientMeta);
294                     }
295                 });
296
297             final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
298                 .get(5, TimeUnit.SECONDS)
299                 .orElseThrow()
300                 .body();
301             assertThat(body, instanceOf(Collection.class));
302             assertEquals("# cars", numCars, ((Collection<?>) body).size());
303         }
304     }
305
306     private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
307         // ask based should track no metadata
308         assertEquals(List.of(), clientMeta.getCurrentHistories());
309     }
310
311     private static void assertTellMetadata(final FrontendClientMetadata clientMeta) {
312         final var iterator = clientMeta.getCurrentHistories().iterator();
313         var metadata = iterator.next();
314         while (iterator.hasNext() && metadata.getHistoryId() != 1) {
315             metadata = iterator.next();
316         }
317         assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
318     }
319
320     @SuppressWarnings("checkstyle:IllegalCatch")
321     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
322             throws Exception {
323         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
324         final String shardName = "default";
325
326         // We don't want the shard to become the leader so prevent shard
327         // elections.
328         datastoreContextBuilder.customRaftPolicyImplementation(
329                 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
330
331         // The ShardManager uses the election timeout for FindPrimary so
332         // reset it low so it will timeout quickly.
333         datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
334         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
335
336         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
337
338             final Object result = dataStore.getActorUtils().executeOperation(
339                 dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true));
340             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
341
342             // Create the write Tx.
343             DOMStoreWriteTransaction writeTxToClose = null;
344             try {
345                 writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
346                         : dataStore.newReadWriteTransaction();
347                 final DOMStoreWriteTransaction writeTx = writeTxToClose;
348                 assertNotNull("newReadWriteTransaction returned null", writeTx);
349
350                 // Do some modifications and ready the Tx on a separate
351                 // thread.
352                 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
353                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
354                 final CountDownLatch txReady = new CountDownLatch(1);
355                 final Thread txThread = new Thread(() -> {
356                     try {
357                         writeTx.write(TestModel.JUNK_PATH,
358                             ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
359
360                         txCohort.set(writeTx.ready());
361                     } catch (Exception e) {
362                         caughtEx.set(e);
363                     } finally {
364                         txReady.countDown();
365                     }
366                 });
367
368                 txThread.start();
369
370                 // Wait for the Tx operations to complete.
371                 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
372                 if (caughtEx.get() != null) {
373                     throw caughtEx.get();
374                 }
375
376                 assertTrue("Tx ready", done);
377
378                 // Wait for the commit to complete. Since no shard
379                 // leader was elected in time, the Tx
380                 // should have timed out and throw an appropriate
381                 // exception cause.
382                 try {
383                     txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
384                     fail("Expected NoShardLeaderException");
385                 } catch (final ExecutionException e) {
386                     final String msg = "Unexpected exception: "
387                             + Throwables.getStackTraceAsString(e.getCause());
388                     if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
389                         assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
390                     } else {
391                         assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
392                     }
393                 }
394             } finally {
395                 try {
396                     if (writeTxToClose != null) {
397                         writeTxToClose.close();
398                     }
399                 } catch (Exception e) {
400                     // FIXME TransactionProxy.close throws IllegalStateException:
401                     // Transaction is ready, it cannot be closed
402                 }
403             }
404         }
405     }
406
407     @Test
408     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
409         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
410         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
411     }
412
413     @Test
414     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
415         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
416     }
417
418     @Test
419     public void testTransactionAbort() throws Exception {
420         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
421         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
422             testParameter, "transactionAbortIntegrationTest", "test-1")) {
423
424             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
425             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
426
427             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
428
429             final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
430
431             cohort.canCommit().get(5, TimeUnit.SECONDS);
432
433             cohort.abort().get(5, TimeUnit.SECONDS);
434
435             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
436                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
437         }
438     }
439
440     @Test
441     @SuppressWarnings("checkstyle:IllegalCatch")
442     public void testTransactionChainWithSingleShard() throws Exception {
443         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
444         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
445             testParameter, "testTransactionChainWithSingleShard", "test-1")) {
446
447             // 1. Create a Tx chain and write-only Tx
448             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
449
450             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
451             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
452
453             // 2. Write some data
454             final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
455             writeTx.write(TestModel.TEST_PATH, testNode);
456
457             // 3. Ready the Tx for commit
458             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
459
460             // 4. Commit the Tx on another thread that first waits for
461             // the second read Tx.
462             final CountDownLatch continueCommit1 = new CountDownLatch(1);
463             final CountDownLatch commit1Done = new CountDownLatch(1);
464             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
465             new Thread(() -> {
466                 try {
467                     continueCommit1.await();
468                     testKit.doCommit(cohort1);
469                 } catch (Exception e) {
470                     commit1Error.set(e);
471                 } finally {
472                     commit1Done.countDown();
473                 }
474             }).start();
475
476             // 5. Create a new read Tx from the chain to read and verify
477             // the data from the first
478             // Tx is visible after being readied.
479             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
480             Optional<NormalizedNode> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
481             assertTrue("isPresent", optional.isPresent());
482             assertEquals("Data node", testNode, optional.get());
483
484             // 6. Create a new RW Tx from the chain, write more data,
485             // and ready it
486             final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
487             final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
488                     .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
489                     .build();
490             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
491
492             final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
493
494             // 7. Create a new read Tx from the chain to read the data
495             // from the last RW Tx to
496             // verify it is visible.
497             readTx = txChain.newReadWriteTransaction();
498             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
499             assertTrue("isPresent", optional.isPresent());
500             assertEquals("Data node", outerNode, optional.get());
501
502             // 8. Wait for the 2 commits to complete and close the
503             // chain.
504             continueCommit1.countDown();
505             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
506
507             if (commit1Error.get() != null) {
508                 throw commit1Error.get();
509             }
510
511             testKit.doCommit(cohort2);
512
513             txChain.close();
514
515             // 9. Create a new read Tx from the data store and verify
516             // committed data.
517             readTx = dataStore.newReadOnlyTransaction();
518             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
519             assertTrue("isPresent", optional.isPresent());
520             assertEquals("Data node", outerNode, optional.get());
521         }
522     }
523
524     @Test
525     public void testTransactionChainWithMultipleShards() throws Exception {
526         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
527         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
528             testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
529
530             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
531
532             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
533             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
534
535             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
536             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
537
538             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
539             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
540
541             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
542
543             final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
544
545             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
546             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
547             readWriteTx.write(carPath, car);
548
549             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
550             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
551             readWriteTx.merge(personPath, person);
552
553             Optional<NormalizedNode> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
554             assertTrue("isPresent", optional.isPresent());
555             assertEquals("Data node", car, optional.get());
556
557             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
558             assertTrue("isPresent", optional.isPresent());
559             assertEquals("Data node", person, optional.get());
560
561             final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
562
563             writeTx = txChain.newWriteOnlyTransaction();
564
565             writeTx.delete(carPath);
566
567             final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
568
569             final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
570             final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
571
572             testKit.doCommit(canCommit1, cohort1);
573             testKit.doCommit(canCommit2, cohort2);
574             testKit.doCommit(cohort3);
575
576             txChain.close();
577
578             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
579
580             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
581             assertFalse("isPresent", optional.isPresent());
582
583             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
584             assertTrue("isPresent", optional.isPresent());
585             assertEquals("Data node", person, optional.get());
586         }
587     }
588
589     @Test
590     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
591         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
592         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
593             testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
594
595             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
596                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
597                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
598                 MoreExecutors.directExecutor());
599
600             final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
601             DOMTransactionChain txChain = broker.createTransactionChain(listener);
602
603             final List<ListenableFuture<?>> futures = new ArrayList<>();
604
605             final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
606             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
607             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
608             futures.add(writeTx.commit());
609
610             int numCars = 100;
611             for (int i = 0; i < numCars; i++) {
612                 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
613
614                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
615                     CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
616
617                 futures.add(rwTx.commit());
618             }
619
620             for (final ListenableFuture<?> f : futures) {
621                 f.get(5, TimeUnit.SECONDS);
622             }
623
624             final Optional<NormalizedNode> optional = txChain.newReadOnlyTransaction()
625                     .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
626             assertTrue("isPresent", optional.isPresent());
627             assertEquals("# cars", numCars, ((Collection<?>) optional.get().body()).size());
628
629             txChain.close();
630
631             broker.close();
632         }
633     }
634
635     @Test
636     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
637         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
638         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
639             testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
640
641             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
642
643             final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
644
645             rwTx1.ready();
646
647             final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
648
649             final Optional<NormalizedNode> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
650             assertFalse("isPresent", optional.isPresent());
651
652             txChain.close();
653         }
654     }
655
656     @Test
657     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
658         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
659         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
660             testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
661
662             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
663
664             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
665             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
666
667             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
668
669             // Try to create another Tx of each type - each should fail
670             // b/c the previous Tx wasn't
671             // readied.
672             testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
673         }
674     }
675
676     @Test
677     public void testCreateChainedTransactionAfterClose() throws Exception {
678         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
679         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
680             testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
681
682             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
683             txChain.close();
684
685             // Try to create another Tx of each type - should fail b/c
686             // the previous Tx was closed.
687             testKit.assertExceptionOnTxChainCreates(txChain, DOMTransactionChainClosedException.class);
688         }
689     }
690
691     @Test
692     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
693         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
694         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
695             testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
696
697             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
698
699             // Create a write tx and submit.
700             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
701             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
702             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
703
704             // Create read-only tx's and issue a read.
705             FluentFuture<Optional<NormalizedNode>> readFuture1 = txChain
706                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
707
708             FluentFuture<Optional<NormalizedNode>> readFuture2 = txChain
709                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
710
711             // Create another write tx and issue the write.
712             DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
713             writeTx2.write(TestModel.OUTER_LIST_PATH,
714                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
715                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
716                 .build());
717
718             // Ensure the reads succeed.
719
720             assertTrue("isPresent", readFuture1.get(5, TimeUnit.SECONDS).isPresent());
721             assertTrue("isPresent", readFuture2.get(5, TimeUnit.SECONDS).isPresent());
722
723             // Ensure the writes succeed.
724             DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
725
726             testKit.doCommit(cohort1);
727             testKit.doCommit(cohort2);
728
729             assertTrue("isPresent", txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
730                 .get(5, TimeUnit.SECONDS).isPresent());
731         }
732     }
733
734     @Test
735     public void testChainedTransactionFailureWithSingleShard() throws Exception {
736         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
737         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
738             testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
739
740             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
741                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
742                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
743                 MoreExecutors.directExecutor());
744
745             final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
746             final DOMTransactionChain txChain = broker.createTransactionChain(listener);
747
748             final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
749
750             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
751                 PeopleModel.emptyContainer());
752
753             final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
754                     .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
755                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
756
757             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
758
759             try {
760                 writeTx.commit().get(5, TimeUnit.SECONDS);
761                 fail("Expected TransactionCommitFailedException");
762             } catch (final ExecutionException e) {
763                 // Expected
764             }
765
766             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
767                 any(Throwable.class));
768
769             txChain.close();
770             broker.close();
771         }
772     }
773
774     @Test
775     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
776         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
777         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
778             testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
779
780             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
781                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
782                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
783                 MoreExecutors.directExecutor());
784
785             final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
786             final DOMTransactionChain txChain = broker.createTransactionChain(listener);
787
788             final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
789
790             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
791                 PeopleModel.emptyContainer());
792
793             final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
794                     .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
795                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
796
797             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
798
799             // Note that merge will validate the data and fail but put
800             // succeeds b/c deep validation is not
801             // done for put for performance reasons.
802             try {
803                 writeTx.commit().get(5, TimeUnit.SECONDS);
804                 fail("Expected TransactionCommitFailedException");
805             } catch (final ExecutionException e) {
806                 // Expected
807             }
808
809             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
810                 any(Throwable.class));
811
812             txChain.close();
813             broker.close();
814         }
815     }
816
817     @Test
818     public void testDataTreeChangeListenerRegistration() throws Exception {
819         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
820         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
821             testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
822
823             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
824                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
825
826             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
827
828             ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
829                     .registerTreeChangeListener(TestModel.TEST_PATH, listener);
830
831             assertNotNull("registerTreeChangeListener returned null", listenerReg);
832
833             IntegrationTestKit.verifyShardState(dataStore, "test-1",
834                 state -> assertEquals("getTreeChangeListenerActors", 1,
835                         state.getTreeChangeListenerActors().size()));
836
837             // Wait for the initial notification
838             listener.waitForChangeEvents(TestModel.TEST_PATH);
839             listener.reset(2);
840
841             // Write 2 updates.
842             testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
843                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
844                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
845                 .build());
846
847             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
848                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
849             testKit.testWriteTransaction(dataStore, listPath,
850                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
851
852             // Wait for the 2 updates.
853             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
854             listenerReg.close();
855
856             IntegrationTestKit.verifyShardState(dataStore, "test-1",
857                 state -> assertEquals("getTreeChangeListenerActors", 0,
858                     state.getTreeChangeListenerActors().size()));
859
860             testKit.testWriteTransaction(dataStore,
861                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
862                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
863                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
864
865             listener.expectNoMoreChanges("Received unexpected change after close");
866         }
867     }
868
869     @Test
870     public void testRestoreFromDatastoreSnapshot() throws Exception {
871         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
872         final String name = "transactionIntegrationTest";
873
874         final ContainerNode carsNode = CarsModel.newCarsNode(
875             CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)),
876                 CarsModel.newCarEntry("sportage", Uint64.valueOf(30000))));
877
878         DataTree dataTree = new InMemoryDataTreeFactory().create(
879             DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
880         AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
881         NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
882
883         final Snapshot carsSnapshot = Snapshot.create(
884             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
885             Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
886
887         dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
888             SchemaContextHelper.full());
889
890         final NormalizedNode peopleNode = PeopleModel.create();
891         AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
892
893         root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
894
895         final Snapshot peopleSnapshot = Snapshot.create(
896             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
897             Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
898
899         testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
900             new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
901             new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
902
903         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
904             testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
905
906             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
907
908             // two reads
909             Optional<NormalizedNode> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
910             assertTrue("isPresent", optional.isPresent());
911             assertEquals("Data node", carsNode, optional.get());
912
913             optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
914             assertTrue("isPresent", optional.isPresent());
915             assertEquals("Data node", peopleNode, optional.get());
916         }
917     }
918
919     @Test
920     public void testSnapshotOnRootOverwrite() throws Exception {
921         if (!DistributedDataStore.class.isAssignableFrom(testParameter)) {
922             // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
923             return;
924         }
925
926         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(),
927                 datastoreContextBuilder.snapshotOnRootOverwrite(true));
928         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
929                 testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf",
930                 true, "cars", "default")) {
931
932             ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
933                     .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
934                     .withChild(CarsModel.create())
935                     .build();
936
937             testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
938             IntegrationTestKit.verifyShardState(dataStore, "cars",
939                 state -> assertEquals(1, state.getSnapshotIndex()));
940
941             // root has been written expect snapshot at index 0
942             verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
943
944             for (int i = 0; i < 10; i++) {
945                 testKit.testWriteTransaction(dataStore, CarsModel.newCarPath("car " + i),
946                     CarsModel.newCarEntry("car " + i, Uint64.ONE));
947             }
948
949             // fake snapshot causes the snapshotIndex to move
950             IntegrationTestKit.verifyShardState(dataStore, "cars",
951                 state -> assertEquals(10, state.getSnapshotIndex()));
952
953             // however the real snapshot still has not changed and was taken at index 0
954             verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
955
956             // root overwrite so expect a snapshot
957             testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
958
959             // this was a real snapshot so everything should be in it(1 + 10 + 1)
960             IntegrationTestKit.verifyShardState(dataStore, "cars",
961                 state -> assertEquals(12, state.getSnapshotIndex()));
962
963             verifySnapshot("member-1-shard-cars-testRootOverwrite", 12, 1);
964         }
965     }
966
967     private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex,
968             final long lastAppliedTerm) {
969         await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
970                 List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
971                 assertEquals(1, snap.size());
972                 assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
973                 assertEquals(lastAppliedTerm, snap.get(0).getLastAppliedTerm());
974             }
975         );
976     }
977 }