Revert "Leader should always apply modifications as local"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
1 /*
2  * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Address;
25 import akka.actor.AddressFromURIString;
26 import akka.cluster.Cluster;
27 import akka.cluster.Member;
28 import akka.dispatch.Futures;
29 import akka.pattern.Patterns;
30 import akka.testkit.javadsl.TestKit;
31 import com.google.common.base.Stopwatch;
32 import com.google.common.base.Throwables;
33 import com.google.common.collect.ImmutableMap;
34 import com.google.common.collect.Range;
35 import com.google.common.primitives.UnsignedLong;
36 import com.google.common.util.concurrent.ListenableFuture;
37 import com.google.common.util.concurrent.MoreExecutors;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import com.typesafe.config.ConfigFactory;
40 import java.util.Arrays;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.Iterator;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.Optional;
47 import java.util.Set;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.atomic.AtomicLong;
51 import java.util.function.Supplier;
52 import org.junit.After;
53 import org.junit.Assume;
54 import org.junit.Before;
55 import org.junit.Ignore;
56 import org.junit.Test;
57 import org.junit.runner.RunWith;
58 import org.junit.runners.Parameterized;
59 import org.junit.runners.Parameterized.Parameter;
60 import org.junit.runners.Parameterized.Parameters;
61 import org.mockito.Mockito;
62 import org.mockito.stubbing.Answer;
63 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
64 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
65 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
66 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
67 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
68 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
69 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
70 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
71 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
72 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
73 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
74 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
75 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
76 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
77 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
79 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
80 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
81 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
82 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
83 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
84 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
85 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
86 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
87 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
88 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
89 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
90 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
91 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
92 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
93 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
94 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
95 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
96 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
97 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
98 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
99 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
100 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
101 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
102 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
103 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
104 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
105 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
106 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
107 import org.opendaylight.yangtools.yang.common.Uint64;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
116 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
117 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
118 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
119 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
120 import scala.concurrent.Await;
121 import scala.concurrent.Future;
122 import scala.concurrent.duration.FiniteDuration;
123
124 /**
125  * End-to-end distributed data store tests that exercise remote shards and transactions.
126  *
127  * @author Thomas Pantelis
128  */
129 @RunWith(Parameterized.class)
130 public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
131
132     @Parameters(name = "{0}")
133     public static Collection<Object[]> data() {
134         return Arrays.asList(new Object[][] {
135                 { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
136         });
137     }
138
139     @Parameter(0)
140     public Class<? extends AbstractDataStore> testParameter;
141     @Parameter(1)
142     public int commitTimeout;
143
144     private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
145     private static final String[] CARS = {"cars"};
146
147     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse(
148             "akka://cluster-test@127.0.0.1:2558");
149     private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse(
150             "akka://cluster-test@127.0.0.1:2559");
151
152     private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
153     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
154     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
155     private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf";
156
157     private ActorSystem leaderSystem;
158     private ActorSystem followerSystem;
159     private ActorSystem follower2System;
160
161     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
162             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
163
164     private final DatastoreContext.Builder followerDatastoreContextBuilder =
165             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
166                 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
167     private final TransactionIdentifier tx1 = nextTransactionId();
168     private final TransactionIdentifier tx2 = nextTransactionId();
169
170     private AbstractDataStore followerDistributedDataStore;
171     private AbstractDataStore leaderDistributedDataStore;
172     private IntegrationTestKit followerTestKit;
173     private IntegrationTestKit leaderTestKit;
174
175     @Before
176     public void setUp() {
177         InMemoryJournal.clear();
178         InMemorySnapshotStore.clear();
179
180         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
181         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
182
183         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
184         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
185
186         follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
187         Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
188     }
189
190     @After
191     public void tearDown() {
192         if (followerDistributedDataStore != null) {
193             leaderDistributedDataStore.close();
194         }
195         if (leaderDistributedDataStore != null) {
196             leaderDistributedDataStore.close();
197         }
198
199         TestKit.shutdownActorSystem(leaderSystem);
200         TestKit.shutdownActorSystem(followerSystem);
201         TestKit.shutdownActorSystem(follower2System);
202
203         InMemoryJournal.clear();
204         InMemorySnapshotStore.clear();
205     }
206
207     private void initDatastoresWithCars(final String type) throws Exception {
208         initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
209     }
210
211     private void initDatastoresWithCarsAndPeople(final String type) throws Exception {
212         initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
213     }
214
215     private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards)
216             throws Exception {
217         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout);
218
219         leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
220                 testParameter, type, moduleShardsConfig, false, shards);
221
222         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout);
223         followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
224                 testParameter, type, moduleShardsConfig, false, shards);
225
226         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards);
227
228         leaderTestKit.waitForMembersUp("member-2");
229         followerTestKit.waitForMembersUp("member-1");
230     }
231
232     private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
233             throws Exception {
234         final Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
235         assertTrue("isPresent", optional.isPresent());
236
237         final CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
238                 CarsModel.CAR_QNAME);
239         for (final NormalizedNode<?, ?> entry: entries) {
240             listBuilder.withChild((MapEntryNode) entry);
241         }
242
243         assertEquals("Car list node", listBuilder.build(), optional.get());
244     }
245
246     private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
247             final NormalizedNode<?, ?> expNode) throws Exception {
248         final Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
249         assertTrue("isPresent", optional.isPresent());
250         assertEquals("Data node", expNode, optional.get());
251     }
252
253     private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
254             throws Exception {
255         final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
256         assertEquals("exists", Boolean.TRUE, exists);
257     }
258
259     @Test
260     public void testWriteTransactionWithSingleShard() throws Exception {
261         final String testName = "testWriteTransactionWithSingleShard";
262         initDatastoresWithCars(testName);
263
264         final String followerCarShardName = "member-2-shard-cars-" + testName;
265
266         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
267         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
268
269         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
270         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
271
272         final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
273         final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
274         writeTx.merge(car1Path, car1);
275
276         final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
277         final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
278         writeTx.merge(car2Path, car2);
279
280         followerTestKit.doCommit(writeTx.ready());
281
282         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
283
284         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
285
286         // Test delete
287
288         writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
289
290         writeTx.delete(car1Path);
291
292         followerTestKit.doCommit(writeTx.ready());
293
294         verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
295
296         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
297
298         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
299
300         // Re-instate the follower member 2 as a single-node to verify replication and recovery.
301
302         // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
303         // applied and all the log entries from the leader. Since we've verified the car data above we know that
304         // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
305         final AtomicLong leaderLastAppliedIndex = new AtomicLong();
306         IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
307             state -> leaderLastAppliedIndex.set(state.getLastApplied()));
308
309         // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
310         // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
311         // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
312         // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
313         // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
314         // until we find the one that encompasses the leader's lastAppliedIndex.
315         Stopwatch sw = Stopwatch.createStarted();
316         boolean done = false;
317         while (!done) {
318             final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
319                     ApplyJournalEntries.class);
320             for (ApplyJournalEntries aje: entries) {
321                 if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
322                     done = true;
323                     break;
324                 }
325             }
326
327             assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
328                     + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
329
330             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
331         }
332
333         TestKit.shutdownActorSystem(leaderSystem, true);
334         TestKit.shutdownActorSystem(followerSystem, true);
335
336         final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
337
338         try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
339                 commitTimeout)
340                 .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
341             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
342         }
343     }
344
345     @Test
346     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
347         final String testName = "testWriteTransactionWithSingleShard";
348         initDatastoresWithCars(testName);
349
350         final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
351
352         DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
353         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
354         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
355         followerTestKit.doCommit(writeTx.ready());
356
357         int numCars = 5;
358         for (int i = 0; i < numCars; i++) {
359             writeTx = txChain.newWriteOnlyTransaction();
360             writeTx.write(CarsModel.newCarPath("car" + i),
361                     CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
362
363             followerTestKit.doCommit(writeTx.ready());
364
365             DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
366             domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
367
368             domStoreReadTransaction.close();
369         }
370
371         // wait to let the shard catch up with purged
372         await("Range set leak test").atMost(5, TimeUnit.SECONDS)
373                 .pollInterval(500, TimeUnit.MILLISECONDS)
374                 .untilAsserted(() -> {
375                     Optional<ActorRef> localShard =
376                             leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
377                     FrontendShardDataTreeSnapshotMetadata frontendMetadata =
378                             (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
379                                     .executeOperation(localShard.get(), new RequestFrontendMetadata());
380
381                     if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
382                         Iterator<FrontendHistoryMetadata> iterator =
383                                 frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
384                         FrontendHistoryMetadata metadata = iterator.next();
385                         while (iterator.hasNext() && metadata.getHistoryId() != 1) {
386                             metadata = iterator.next();
387                         }
388
389                         assertEquals(0, metadata.getClosedTransactions().size());
390                         assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
391                                 metadata.getPurgedTransactions().asRanges().iterator().next());
392                     } else {
393                         // ask based should track no metadata
394                         assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
395                     }
396                 });
397
398         final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
399                 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
400         assertTrue("isPresent", optional.isPresent());
401         assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
402     }
403
404     @Test
405     @Ignore("Flushes out tell based leak needs to be handled separately")
406     public void testCloseTransactionMetadataLeak() throws Exception {
407         // Ask based frontend seems to have some issues with back to back close
408         Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
409
410         final String testName = "testWriteTransactionWithSingleShard";
411         initDatastoresWithCars(testName);
412
413         final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
414
415         DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
416         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
417         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
418         followerTestKit.doCommit(writeTx.ready());
419
420         int numCars = 5;
421         for (int i = 0; i < numCars; i++) {
422             writeTx = txChain.newWriteOnlyTransaction();
423             writeTx.close();
424
425             DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
426             domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
427
428             domStoreReadTransaction.close();
429         }
430
431         writeTx = txChain.newWriteOnlyTransaction();
432         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
433         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
434         followerTestKit.doCommit(writeTx.ready());
435
436         // wait to let the shard catch up with purged
437         await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
438                 .pollInterval(500, TimeUnit.MILLISECONDS)
439                 .untilAsserted(() -> {
440                     Optional<ActorRef> localShard =
441                             leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
442                     FrontendShardDataTreeSnapshotMetadata frontendMetadata =
443                             (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
444                                     .executeOperation(localShard.get(), new RequestFrontendMetadata());
445
446                     if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
447                         Iterator<FrontendHistoryMetadata> iterator =
448                                 frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
449                         FrontendHistoryMetadata metadata = iterator.next();
450                         while (iterator.hasNext() && metadata.getHistoryId() != 1) {
451                             metadata = iterator.next();
452                         }
453
454                         Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
455
456                         assertEquals(0, metadata.getClosedTransactions().size());
457                         assertEquals(1, ranges.size());
458                     } else {
459                         // ask based should track no metadata
460                         assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
461                     }
462                 });
463
464         final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
465                 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
466         assertTrue("isPresent", optional.isPresent());
467         assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
468     }
469
470     @Test
471     public void testReadWriteTransactionWithSingleShard() throws Exception {
472         initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
473
474         final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
475         assertNotNull("newReadWriteTransaction returned null", rwTx);
476
477         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
478         rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
479
480         final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
481         rwTx.merge(CarsModel.newCarPath("optima"), car1);
482
483         verifyCars(rwTx, car1);
484
485         final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
486         final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
487         rwTx.merge(car2Path, car2);
488
489         verifyExists(rwTx, car2Path);
490
491         followerTestKit.doCommit(rwTx.ready());
492
493         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
494     }
495
496     @Test
497     public void testWriteTransactionWithMultipleShards() throws Exception {
498         initDatastoresWithCarsAndPeople("testWriteTransactionWithMultipleShards");
499
500         final DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
501         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
502
503         final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
504         final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
505         writeTx.write(carsPath, carsNode);
506
507         final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
508         final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
509         writeTx.write(peoplePath, peopleNode);
510
511         followerTestKit.doCommit(writeTx.ready());
512
513         final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
514
515         verifyNode(readTx, carsPath, carsNode);
516         verifyNode(readTx, peoplePath, peopleNode);
517     }
518
519     @Test
520     public void testReadWriteTransactionWithMultipleShards() throws Exception {
521         initDatastoresWithCarsAndPeople("testReadWriteTransactionWithMultipleShards");
522
523         final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
524         assertNotNull("newReadWriteTransaction returned null", rwTx);
525
526         final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
527         final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
528         rwTx.write(carsPath, carsNode);
529
530         final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
531         final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
532         rwTx.write(peoplePath, peopleNode);
533
534         followerTestKit.doCommit(rwTx.ready());
535
536         final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
537
538         verifyNode(readTx, carsPath, carsNode);
539         verifyNode(readTx, peoplePath, peopleNode);
540     }
541
542     @Test
543     public void testTransactionChainWithSingleShard() throws Exception {
544         initDatastoresWithCars("testTransactionChainWithSingleShard");
545
546         final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
547
548         // Add the top-level cars container with write-only.
549
550         final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
551         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
552
553         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
554
555         final DOMStoreThreePhaseCommitCohort writeTxReady = writeTx.ready();
556
557         // Verify the top-level cars container with read-only.
558
559         verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
560
561         // Perform car operations with read-write.
562
563         final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
564
565         verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
566
567         rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
568
569         final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
570         final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
571         rwTx.write(car1Path, car1);
572
573         verifyExists(rwTx, car1Path);
574
575         verifyCars(rwTx, car1);
576
577         final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
578         rwTx.merge(CarsModel.newCarPath("sportage"), car2);
579
580         rwTx.delete(car1Path);
581
582         followerTestKit.doCommit(writeTxReady);
583
584         followerTestKit.doCommit(rwTx.ready());
585
586         txChain.close();
587
588         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
589     }
590
591     @Test
592     public void testTransactionChainWithMultipleShards() throws Exception {
593         initDatastoresWithCarsAndPeople("testTransactionChainWithMultipleShards");
594
595         final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
596
597         DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
598         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
599
600         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
601         writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
602
603         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
604         writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
605
606         followerTestKit.doCommit(writeTx.ready());
607
608         final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
609
610         final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
611         final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
612         readWriteTx.write(carPath, car);
613
614         final MapEntryNode person = PeopleModel.newPersonEntry("jack");
615         final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
616         readWriteTx.merge(personPath, person);
617
618         Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
619         assertTrue("isPresent", optional.isPresent());
620         assertEquals("Data node", car, optional.get());
621
622         optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
623         assertTrue("isPresent", optional.isPresent());
624         assertEquals("Data node", person, optional.get());
625
626         final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
627
628         writeTx = txChain.newWriteOnlyTransaction();
629
630         writeTx.delete(personPath);
631
632         final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
633
634         followerTestKit.doCommit(cohort2);
635         followerTestKit.doCommit(cohort3);
636
637         txChain.close();
638
639         final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
640         verifyCars(readTx, car);
641
642         optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
643         assertFalse("isPresent", optional.isPresent());
644     }
645
646     @Test
647     public void testChainedTransactionFailureWithSingleShard() throws Exception {
648         initDatastoresWithCars("testChainedTransactionFailureWithSingleShard");
649
650         final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
651                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
652                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
653                         MoreExecutors.directExecutor());
654
655         final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
656         final DOMTransactionChain txChain = broker.createTransactionChain(listener);
657
658         final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
659
660         final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
661                 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
662                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
663
664         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
665
666         try {
667             writeTx.commit().get(5, TimeUnit.SECONDS);
668             fail("Expected TransactionCommitFailedException");
669         } catch (final ExecutionException e) {
670             // Expected
671         }
672
673         verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
674
675         txChain.close();
676         broker.close();
677     }
678
679     @Test
680     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
681         initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards");
682
683         final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
684                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
685                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
686                         MoreExecutors.directExecutor());
687
688         final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
689         final DOMTransactionChain txChain = broker.createTransactionChain(listener);
690
691         final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
692
693         writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
694
695         final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
696                 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
697                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
698
699         // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
700         // done for put for performance reasons.
701         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
702
703         try {
704             writeTx.commit().get(5, TimeUnit.SECONDS);
705             fail("Expected TransactionCommitFailedException");
706         } catch (final ExecutionException e) {
707             // Expected
708         }
709
710         verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
711
712         txChain.close();
713         broker.close();
714     }
715
716     @Test
717     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
718         followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
719         final String testName = "testSingleShardTransactionsWithLeaderChanges";
720         initDatastoresWithCars(testName);
721
722         final String followerCarShardName = "member-2-shard-cars-" + testName;
723         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class);
724
725         // Write top-level car container from the follower so it uses a remote Tx.
726
727         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
728
729         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
730         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
731
732         followerTestKit.doCommit(writeTx.ready());
733
734         InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
735
736         // Switch the leader to the follower
737
738         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
739                 .shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
740
741         TestKit.shutdownActorSystem(leaderSystem, true);
742         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
743
744         followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS);
745
746         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
747         Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
748
749         final DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder()
750                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
751         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout);
752
753         try (AbstractDataStore ds =
754                 newMember1TestKit.setupAbstractDataStore(
755                         testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
756
757             followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS);
758
759             // Write a car entry to the new leader - should switch to local Tx
760
761             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
762
763             MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
764             YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
765             writeTx.merge(car1Path, car1);
766
767             followerTestKit.doCommit(writeTx.ready());
768
769             verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
770         }
771     }
772
773     @SuppressWarnings("unchecked")
774     @Test
775     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
776         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
777         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
778
779         final Optional<ActorRef> carsFollowerShard =
780                 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
781         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
782
783         final DataTree dataTree = new InMemoryDataTreeFactory().create(
784             DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
785
786         // Send a tx with immediate commit.
787
788         DataTreeModification modification = dataTree.takeSnapshot().newModification();
789         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
790         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
791
792         final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
793         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
794         modification.ready();
795
796         ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
797
798         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
799         Object resp = followerTestKit.expectMsgClass(Object.class);
800         if (resp instanceof akka.actor.Status.Failure) {
801             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
802         }
803
804         assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
805
806         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
807
808         // Send another tx without immediate commit.
809
810         modification = dataTree.takeSnapshot().newModification();
811         MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
812         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
813         modification.ready();
814
815         readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
816
817         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
818         resp = followerTestKit.expectMsgClass(Object.class);
819         if (resp instanceof akka.actor.Status.Failure) {
820             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
821         }
822
823         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
824
825         final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
826                 ((ReadyTransactionReply)resp).getCohortPath());
827
828         final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
829         Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
830         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
831                 leaderDistributedDataStore.getActorUtils(), Arrays.asList(
832                         new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
833         cohort.canCommit().get(5, TimeUnit.SECONDS);
834         cohort.preCommit().get(5, TimeUnit.SECONDS);
835         cohort.commit().get(5, TimeUnit.SECONDS);
836
837         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
838     }
839
840     @SuppressWarnings("unchecked")
841     @Test
842     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
843         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
844         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
845
846         final Optional<ActorRef> carsFollowerShard =
847                 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
848         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
849
850         carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
851         final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
852
853         // Send a tx with immediate commit.
854
855         DataTreeModification modification = dataTree.takeSnapshot().newModification();
856         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
857         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
858
859         final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
860         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
861
862         ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
863                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
864                         Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true,
865                 Optional.empty());
866
867         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
868         Object resp = followerTestKit.expectMsgClass(Object.class);
869         if (resp instanceof akka.actor.Status.Failure) {
870             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
871         }
872
873         assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
874
875         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
876
877         // Send another tx without immediate commit.
878
879         modification = dataTree.takeSnapshot().newModification();
880         MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
881         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
882
883         forwardedReady = new ForwardedReadyTransaction(tx2,
884                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
885                         Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false,
886                 Optional.empty());
887
888         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
889         resp = followerTestKit.expectMsgClass(Object.class);
890         if (resp instanceof akka.actor.Status.Failure) {
891             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
892         }
893
894         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
895
896         ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
897                 ((ReadyTransactionReply)resp).getCohortPath());
898
899         final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
900         Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
901         final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
902                 leaderDistributedDataStore.getActorUtils(), Arrays.asList(
903                         new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
904         cohort.canCommit().get(5, TimeUnit.SECONDS);
905         cohort.preCommit().get(5, TimeUnit.SECONDS);
906         cohort.commit().get(5, TimeUnit.SECONDS);
907
908         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
909     }
910
911     @Test
912     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
913         // FIXME: remove when test passes also for ClientBackedDataStore
914         Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
915         followerDatastoreContextBuilder.shardBatchedModificationCount(2);
916         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
917         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
918
919         // Do an initial write to get the primary shard info cached.
920
921         final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
922         initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
923         initialWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
924         followerTestKit.doCommit(initialWriteTx.ready());
925
926         // Wait for the commit to be replicated to the follower.
927
928         MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
929             raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
930
931         MemberNode.verifyRaftState(followerDistributedDataStore, "people",
932             raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
933
934         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
935         // the leader shard.
936
937         final DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
938         writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
939         writeTx1.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
940         final DOMStoreThreePhaseCommitCohort writeTx1Cohort = writeTx1.ready();
941         final ListenableFuture<Boolean> writeTx1CanCommit = writeTx1Cohort.canCommit();
942         writeTx1CanCommit.get(5, TimeUnit.SECONDS);
943
944         // Prepare and ready another WO tx that writes to 2 shards but don't canCommit yet. This will be queued
945         // in the leader shard.
946
947         final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
948         final LinkedList<MapEntryNode> cars = new LinkedList<>();
949         int carIndex = 1;
950         cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
951         writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
952         carIndex++;
953         NormalizedNode<?, ?> people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
954                 .withChild(PeopleModel.newPersonEntry("Dude")).build();
955         writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
956         final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
957
958         // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
959         // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the
960         // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be
961         // sent on ready.
962
963         final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
964         for (int i = 1; i <= 5; i++, carIndex++) {
965             cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
966             writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
967         }
968
969         // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications
970         // message on ready.
971
972         final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
973         cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
974         writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
975         carIndex++;
976
977         // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the
978         // leader shard on ready.
979
980         final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
981         cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
982         readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
983
984         IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
985             stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
986
987         // Disable elections on the leader so it switches to follower.
988
989         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
990                 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
991                 .shardElectionTimeoutFactor(10));
992
993         leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars");
994
995         // Submit all tx's - the messages should get queued for retry.
996
997         final ListenableFuture<Boolean> writeTx2CanCommit = writeTx2Cohort.canCommit();
998         final DOMStoreThreePhaseCommitCohort writeTx3Cohort = writeTx3.ready();
999         final DOMStoreThreePhaseCommitCohort writeTx4Cohort = writeTx4.ready();
1000         final DOMStoreThreePhaseCommitCohort rwTxCohort = readWriteTx.ready();
1001
1002         // Enable elections on the other follower so it becomes the leader, at which point the
1003         // tx's should get forwarded from the previous leader to the new leader to complete the commits.
1004
1005         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1006                 .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
1007         IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars")
1008                 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1009         IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people")
1010                 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1011
1012         followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
1013         followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
1014         followerTestKit.doCommit(writeTx3Cohort);
1015         followerTestKit.doCommit(writeTx4Cohort);
1016         followerTestKit.doCommit(rwTxCohort);
1017
1018         DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
1019         verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
1020         verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
1021     }
1022
1023     @Test
1024     public void testLeadershipTransferOnShutdown() throws Exception {
1025         // FIXME: remove when test passes also for ClientBackedDataStore
1026         Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
1027         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
1028         followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
1029         final String testName = "testLeadershipTransferOnShutdown";
1030         initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
1031
1032         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
1033                 DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500),
1034                 commitTimeout);
1035         try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
1036                 testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
1037
1038             followerTestKit.waitForMembersUp("member-3");
1039             follower2TestKit.waitForMembersUp("member-1", "member-2");
1040
1041             // Create and submit a couple tx's so they're pending.
1042
1043             DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
1044             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1045             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
1046             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1047             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1048
1049             IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
1050                 stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
1051
1052             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
1053             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
1054             writeTx.write(CarsModel.newCarPath("optima"), car);
1055             final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
1056
1057             IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
1058                 stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
1059
1060             // Gracefully stop the leader via a Shutdown message.
1061
1062             sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1063                 .shardElectionTimeoutFactor(100));
1064
1065             final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1066             final Future<ActorRef> future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars");
1067             final ActorRef leaderActor = Await.result(future, duration);
1068
1069             final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
1070
1071             // Commit the 2 transactions. They should finish and succeed.
1072
1073             followerTestKit.doCommit(cohort1);
1074             followerTestKit.doCommit(cohort2);
1075
1076             // Wait for the leader actor stopped.
1077
1078             final Boolean stopped = Await.result(stopFuture, duration);
1079             assertEquals("Stopped", Boolean.TRUE, stopped);
1080
1081             // Verify leadership was transferred by reading the committed data from the other nodes.
1082
1083             verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car);
1084             verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car);
1085         }
1086     }
1087
1088     @Test
1089     public void testTransactionWithIsolatedLeader() throws Exception {
1090         // FIXME: remove when test passes also for ClientBackedDataStore
1091         Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
1092         // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
1093         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
1094         final String testName = "testTransactionWithIsolatedLeader";
1095         initDatastoresWithCars(testName);
1096
1097         // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader.
1098         final DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1099         preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1100
1101         // Tx that is submitted after the leader transitions to IsolatedLeader.
1102         final DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1103         noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1104
1105         // Tx that is submitted after the follower is reinstated.
1106         final DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1107         successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1108
1109         // Stop the follower
1110         followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager());
1111         followerDistributedDataStore.close();
1112         followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager());
1113
1114         // Submit the preIsolatedLeaderWriteTx so it's pending
1115         final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
1116
1117         // Change the isolated leader check interval low so it changes to IsolatedLeader.
1118         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1119                 .shardIsolatedLeaderCheckIntervalInMillis(200));
1120
1121         MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
1122             raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
1123
1124         try {
1125             leaderTestKit.doCommit(noShardLeaderWriteTx.ready());
1126             fail("Expected NoShardLeaderException");
1127         } catch (final ExecutionException e) {
1128             assertEquals("getCause", NoShardLeaderException.class, Throwables.getRootCause(e).getClass());
1129         }
1130
1131         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1132                 .shardElectionTimeoutFactor(100));
1133
1134         final DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
1135
1136         followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
1137                 testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
1138
1139         leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
1140         leaderTestKit.doCommit(successTxCohort);
1141     }
1142
1143     @Test
1144     public void testTransactionWithShardLeaderNotResponding() throws Exception {
1145         followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
1146         followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
1147         initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
1148
1149         // Do an initial read to get the primary shard info cached.
1150
1151         final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1152         readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1153
1154         // Shutdown the leader and try to create a new tx.
1155
1156         TestKit.shutdownActorSystem(leaderSystem, true);
1157
1158         followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
1159         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
1160
1161         final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1162
1163         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1164
1165         try {
1166             followerTestKit.doCommit(rwTx.ready());
1167             fail("Exception expected");
1168         } catch (final ExecutionException e) {
1169             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
1170             if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
1171                 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
1172                         || e.getCause() instanceof ShardLeaderNotRespondingException);
1173             } else {
1174                 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
1175             }
1176         }
1177     }
1178
1179     @Test
1180     public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception {
1181         followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
1182         initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
1183
1184         // Do an initial read to get the primary shard info cached.
1185
1186         final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1187         readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1188
1189         // Shutdown the leader and try to create a new tx.
1190
1191         TestKit.shutdownActorSystem(leaderSystem, true);
1192
1193         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
1194
1195         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1196
1197         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1198                 .operationTimeoutInMillis(10).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
1199
1200         final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1201
1202         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1203
1204         try {
1205             followerTestKit.doCommit(rwTx.ready());
1206             fail("Exception expected");
1207         } catch (final ExecutionException e) {
1208             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
1209             if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
1210                 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
1211             } else {
1212                 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
1213             }
1214         }
1215     }
1216
1217     @Test
1218     public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
1219         followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
1220         String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
1221         initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1222
1223         final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
1224                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
1225         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
1226                 follower2System, follower2DatastoreContextBuilder, commitTimeout);
1227
1228         try (AbstractDataStore ds =
1229                 follower2TestKit.setupAbstractDataStore(
1230                         testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
1231
1232             followerTestKit.waitForMembersUp("member-1", "member-3");
1233             follower2TestKit.waitForMembersUp("member-1", "member-2");
1234
1235             // Do an initial read to get the primary shard info cached.
1236
1237             final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1238             readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1239
1240             // Shutdown the leader and try to create a new tx.
1241
1242             TestKit.shutdownActorSystem(leaderSystem, true);
1243
1244             Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
1245
1246             sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1247                 .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null));
1248
1249             final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1250
1251             rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1252
1253             followerTestKit.doCommit(rwTx.ready());
1254         }
1255     }
1256
1257     @Test
1258     public void testSemiReachableCandidateNotDroppingLeader() throws Exception {
1259         final String testName = "testSemiReachableCandidateNotDroppingLeader";
1260         initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1261
1262         final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
1263                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
1264         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
1265                 follower2System, follower2DatastoreContextBuilder, commitTimeout);
1266
1267         final AbstractDataStore ds2 =
1268                      follower2TestKit.setupAbstractDataStore(
1269                              testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
1270
1271         followerTestKit.waitForMembersUp("member-1", "member-3");
1272         follower2TestKit.waitForMembersUp("member-1", "member-2");
1273
1274         TestKit.shutdownActorSystem(follower2System);
1275
1276         ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
1277         OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1278                 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1279
1280         Cluster leaderCluster = Cluster.get(leaderSystem);
1281         Cluster followerCluster = Cluster.get(followerSystem);
1282         Cluster follower2Cluster = Cluster.get(follower2System);
1283
1284         Member follower2Member = follower2Cluster.readView().self();
1285
1286         await().atMost(10, TimeUnit.SECONDS)
1287                 .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member));
1288         await().atMost(10, TimeUnit.SECONDS)
1289                 .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member));
1290
1291         ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
1292
1293         // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
1294         // candidate, we can just send a couple of RequestVotes to both leader and follower.
1295         cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
1296         followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
1297         cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
1298         followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
1299
1300         OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1301                 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1302         OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils()
1303                 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1304
1305         assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm());
1306         assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm());
1307
1308         ds2.close();
1309     }
1310
1311     @Test
1312     public void testInstallSnapshot() throws Exception {
1313         final String testName = "testInstallSnapshot";
1314         final String leaderCarShardName = "member-1-shard-cars-" + testName;
1315         final String followerCarShardName = "member-2-shard-cars-" + testName;
1316
1317         // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
1318         // install a snapshot to sync the follower.
1319
1320         DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION,
1321             SchemaContextHelper.full());
1322
1323         final ContainerNode carsNode = CarsModel.newCarsNode(
1324                 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000))));
1325         AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
1326
1327         final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty());
1328         final Snapshot initialSnapshot = Snapshot.create(
1329                 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
1330                 Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
1331         InMemorySnapshotStore.addSnapshot(leaderCarShardName, initialSnapshot);
1332
1333         InMemorySnapshotStore.addSnapshotSavedLatch(leaderCarShardName);
1334         InMemorySnapshotStore.addSnapshotSavedLatch(followerCarShardName);
1335
1336         initDatastoresWithCars(testName);
1337
1338         final Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
1339                 CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1340         assertTrue("isPresent", readOptional.isPresent());
1341         assertEquals("Node", carsNode, readOptional.get());
1342
1343         verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
1344                 initialSnapshot, snapshotRoot);
1345
1346         verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(followerCarShardName, Snapshot.class),
1347                 initialSnapshot, snapshotRoot);
1348     }
1349
1350     @Test
1351     public void testReadWriteMessageSlicing() throws Exception {
1352         // The slicing is only implemented for tell-based protocol
1353         Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
1354
1355         leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
1356         followerDatastoreContextBuilder.maximumMessageSliceSize(100);
1357         initDatastoresWithCars("testLargeReadReplySlicing");
1358
1359         final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1360
1361         final NormalizedNode<?, ?> carsNode = CarsModel.create();
1362         rwTx.write(CarsModel.BASE_PATH, carsNode);
1363
1364         verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
1365     }
1366
1367     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
1368                                        final NormalizedNode<?, ?> expRoot) {
1369         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
1370         assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
1371         assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());
1372         assertEquals("Snapshot getLastIndex", expected.getLastIndex(), actual.getLastIndex());
1373         assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
1374         MetadataShardDataTreeSnapshot shardSnapshot =
1375                 (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
1376         assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
1377     }
1378
1379     private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
1380         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
1381         final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
1382         final Answer<DatastoreContext> answer = invocation -> newBuilder.build();
1383         Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
1384         Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
1385         dataStore.onDatastoreContextUpdated(mockContextFactory);
1386     }
1387 }